对“进程”而言,单纯的是想要实现多任务。 对“线程”而言,主要是为性能考虑。
因为有“阻塞”,转而处理另一个任务,所以效率更高。
首先,Java中关于线程Thread最基本的事实是:
其次,关于Thread对象实例的构造,需要注意,start()方法依赖于run()方法:
朴素的Thread对象,对映单个线程。多个Thread对象,多个线程是可以共存的。但会互相竞争资源。Executor 创建一个“线程池”的概念,对线程统一管理。
以上四个线程池工厂方法返回值面向统一的接口:ExecutorService。 一般情况下,不要使用单独线程。总是首先考虑线程池!
无返回值的启动新线程靠调用execute()方法。多个线程完成任务的时间轴完全是随机的。
有返回值的启动新线程靠调用submit()方法。多个线程间的相对执行顺序也是乱序。返回值类型为Future。
!注意: 因为JVM不保证Future获得返回值的时间。所以,用get()方法获取返回值时,如果返回值还没有计算完成,get()方法是阻塞当前线程的。所以下面4个方法,第一个看上去完全是顺序执行的,后面三个都是乱序。就是因为get()方法等待计算结果完成后才打印结果。
public static void newCalledToFuture(){
ExecutorService ex=Executors.newCachedThreadPool();
try{
List<Future<Integer>> list=new ArrayList<Future<Integer>>();
for(int i=0;i<10;i++){
TestCall called=new TestCall(i+1);
Future<Integer> result=ex.submit(called);
list.add(result);
System.out.println("SUM >>> #"+called.getId()+"("+result.get()+")");
}
}catch(InterruptedException ie){
System.out.println(ie);
}catch(ExecutionException ee){
System.out.println(ee);
}finally{
ex.shutdown();
}
}
public static void newCalled(){
ExecutorService ex=Executors.newCachedThreadPool();
List<Future<Integer>> list=new ArrayList<Future<Integer>>();
for(int i=0;i<10;i++){
TestCall called=new TestCall(i+1);
list.add(ex.submit(called));
System.out.println("SUM >>> #"+called.getId()+" finished!");
}
ex.shutdown();
}
public static void toFuture(){
ExecutorService ex=Executors.newCachedThreadPool();
List<Future<Integer>> list=new ArrayList<Future<Integer>>();
for(int i=0;i<10;i++){
Future<Integer> result=ex.submit(new TestCall(i+1));
list.add(result);
}
ex.shutdown();
}
public static void direct(){
ExecutorService ex=Executors.newCachedThreadPool();
List<Future<Integer>> list=new ArrayList<Future<Integer>>();
for(int i=0;i<10;i++){
list.add(ex.submit(new TestCall(i+1)));
}
ex.shutdown();
}
线程的优先级 不是“死锁”。并不保证高优先级的线程一定在低优先级线程之前执行,而低优先级的线程迟迟得不到执行。
事实是:高优先级的线程只是执行的频率较高而已。
和System.gc()方法类似,yield()方法仅仅是“建议”当前线程可以让给其他线程了。但完全不保证会让位。
关于Daemon Thread后台线程的几个重要事实有:
刚才说过的Executor的四个线程池工厂方法接受ThreadFactory类型作为参数,用来封装Thread的实例化过程。ThreadFactory接口只定义了Thread newThread(Runnable r)一个方法。
!注意: 用ThreadFactory构造出来的Executor,在执行executor(Runnable r)方法时,传入的Runnable对象参数不会直接被传递给ThreadFactory的newThread(Runnable r)方法,而是会先包装成一个Worker。所以想通过Runnable往newThread(Runnable r)方法传递数据是危险的。
书里未捕获异常一部分讲的不是很清楚。 这里稍微解释一下。
如果在Runnable对象的run()方法中抛出异常的话,在run()方法中捕获异常还来得及。
public class UncaughtException{
//Runnable
public static class SuperException implements Runnable{
public void run(){
try{
throw new RuntimeException();
}catch(Exception e){
System.out.println("Bingo! Exception caught!");
}
}
}
//Executor
public static void letsGo(){
ExecutorService es=Executors.newCachedThreadPool();
es.execute(new SuperException());
es.shutdown();
}
//main
public static void main(String[] args){
letsGo();
}
}
如果在Executor里捕获已经捕获不到了。这就是所谓的 “异常逃逸”。
public class UncaughtException{
//Runnable
public static class SuperException implements Runnable{
public void run(){
throw new RuntimeException();
}
}
//Executor
public static void letsGo(){
ExecutorService es=Executors.newCachedThreadPool();
try{
es.execute(new SuperException());
}catch(Exception e){
System.out.println("Bingo! Exception caught!");
}finally{
es.shutdown();
}
}
//main
public static void main(String[] args){
letsGo();
}
}
“异常逃逸”不是说异常就不见了,消失了。其实它还是会冒泡到控制台的。而且自作主张显示在异常报告的第一行。这里的”逃逸”是指异常逃脱了我们try{}catch{}语句对异常的处理。
Exception in thread "pool-1-thread-1" java.lang.RuntimeException
at com.ciaoshen.thinkinjava.chapter21.UncaughtException$SuperException.run(UncaughtException.java:11)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
逃逸的原因很容易猜,因为执行execute()方法的是主线程的Excecutor。而抛出异常的线程池中被分配来执行run()的某线程。JVM的异常处理是各线程只管自己的事。所以同理,就算我们把异常处理套到main()方法的主体中也无法捕获异常。因为始终是在主线程里做动作,这是无法处理其他线程里的异常的。
public class UncaughtException{
//Runnable
public static class SuperException implements Runnable{
public void run(){
throw new RuntimeException();
}
}
//Executor
public static void letsGo(){
ExecutorService es=Executors.newCachedThreadPool();
es.execute(new SuperException());
es.shutdown();
}
//main
public static void main(String[] args){
try{
letsGo();
}catch(Exception e){
System.out.println("Bingo! Exception caught!");
}
}
}
想要捕获的话,就要到执行run()方法的线程里想办法。Java的解决办法是,先创建一个UncaughtExceptionHandler,
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
public void uncaughtException(Thread t, Throwable e) {
System.out.println("caught " + e);
}
}
然后在ThreadFactory里,用setUncaughtExceptionHandler()方法,把这个handler附着在某个Thread上。
class HandlerThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
System.out.println(this + " creating new Thread");
Thread t = new Thread(r);
System.out.println("created " + t);
t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
System.out.println("eh = " + t.getUncaughtExceptionHandler());
return t;
}
}
并发最常见也最典型的问题来了:竞争公共资源。道理很简单,厕所只有一个,大家都要上怎么办?。这里大家争夺的“资源”,术语叫 “状态”。其实可以理解为一个“变量”。当有多个线程可以调用某些方法,改变同一变量的状态,就构成了“竟态条件”,可能引起冲突。下面代码就是构成一个最简单的“竞态条件”:我们有一个数字变量。increment()方法每次对这个数加2,正常情况下,输出的数字一直是偶数。但当我们并发多个线程同时执行,线程很可能在执行了一次自增操作后就线程就被挂起。另一个线程接管,因此输出有可能变成奇数。这就是竞态条件下的冲突。
class Mutex implements Runnable{
private volatile int num=0;
public void increment(){
num++;
Thread.yield();
num++;
System.out.println(num);
}
}
最常用的解决方法就是加“独占锁”。用synchronized关键字。
class Mutex implements Runnable{
private volatile int num=0; //“private”禁止外部方法调用
public synchronized void increment(){
num++;
Thread.yield();
num++;
System.out.println(num);
}
}
这里synchronized所做的事情是:声明任何线程如果想要调用increment()方法,必须先获得当前Mutex类实例对象的唯一“独占令牌”,直到increment()方法执行完成,才释放令牌。在此期间,其他所有希望对同一个Mutex对象执行increment()操作的线程,都必须阻塞等候。
!注意: 所以synchronized是一个“悲观锁”,或者叫“阻塞锁”。对象的独占令牌被占用后,其他尝试调用互斥令牌的线程会被阻塞等候。而且它是“不公平的”,因为它不保证先到的线程先执行。
所以synchronized “效率较低。”。因为阻塞,挂起,切换上下文,恢复线程,都需要转入内核态完成,开销很大。
独占锁还可以通过下面synchronized(){}区块的形式,划定独占锁的执行范围。小括号里用来指定独占锁针对的对象。花括号划定需要用锁的范围。
class Mutex implements Runnable{
private volatile int num=0; //“private”禁止外部方法调用
public void increment(){
synchronized(this){
num++;
Thread.yield();
num++;
System.out.println(num);
}
}
}
对于独占锁,或者独占区块,需要记住:独占锁定义的是:“过程”在访问“变量”的时候,需要取得变量的“独占令牌”。 打个不恰当的比方:“飞机上大家要使用公共厕所”。这时候:
下面是一个测试的例子:我有一个输出结果永远为偶数的自增操作单元。可以每次自增2,以及检查输出是不是偶数。
public class TestLock{
private volatile int num=0;
public void increment(){
//synchronized(this){ //可以锁这里
num++; Thread.yield(); num++;
//}
}
public int getNum(){
//synchronized(this){ //可以锁这里
return num;
//}
}
}
然后,写两个Runnable类允许多线程同时调用自增操作单元。
public class Checker2{
private volatile TestLock tl;
public Checker2(TestLock tl){this.tl=tl;}
public class Run implements Runnable{
@Override
public void run(){
//synchronized(Checker2.this){ //这里不行
long stop=System.currentTimeMillis()+10;
while(System.currentTimeMillis()<stop){
synchronized(tl){ ////这里可以
tl.increment();
}
}
//}
}
}
public class Check implements Runnable{
@Override
public void run(){
//synchronized(Checker2.this){ //这里不行
long stop=System.currentTimeMillis()+10;
int num;
while(System.currentTimeMillis()<stop){
synchronized(tl){ //这里可以
num=tl.getNum();
}
if(num%2!=0){
System.out.println(num);
Thread.yield();
}
}
//}
}
}
//main execute the runnable
public static void main(String[] args){
Checker2 ck=new Checker2(new TestLock());
ExecutorService es=Executors.newCachedThreadPool();
es.execute(ck.new Run());
es.execute(ck.new Check());
es.shutdown();
}
}
注意,我们要求取得独占令牌的对象可以是当前类型的一个成员字段“TestLock tl”。锁可以加在TestLock类的被调用的原始操作上。也可以加在Checker2涉及“TestLock tl”的过程上。
但注意临界区尽量精准,千万不要直接套在整个while()的外面。这样“TestLock tl”的锁会被一直占用,导致其他线程完全无法获得令牌。
除了synchronized之外,另一个选择是使用ReentrantLock,又叫“乐观锁”,或者“可重入锁”。用法和效果和synchronized都差不多。差别是它必须显式地创建锁,锁住和解锁。
class Mutex implements Runnable{
private volatile int num=0; //“private”禁止外部方法调用
private Lock lock=new ReentrantLock();
public void increment(){
lock.lock();
try{
num++;
Thread.yield();
num++;
System.out.println(num);
}finally{
lock.unlock();
}
}
}
但ReentrantLock解决资源冲突的机制,使用了和synchronized不同的 非阻塞算法(non-blocking algorithms)。简单说就是:乐观地假设操作不会频繁地引起冲突。不管三七二十一先进行操作,如果没有其他线程争用共享数据,那操作就成功了。如果共享数据被争用,产生了冲突,那就再进行其他的补偿措施(最常见的补偿措施就是不断地重试,直到试成功为止)。所以采取这种策略的锁都可以叫 “乐观锁”。
乐观锁的核心思想是基于volatile int state这样的一个属性,同时配合Unsafe工具用原子性的操作来实现对当前锁的状态进行修改。当state的值为0的时候,标识改Lock不被任何线程所占有。
乐观锁的关键点就在于:需要冲突检测和修改状态这两个步骤具备原子性,这样就能保证不可能同时有两个线程同时修改了状态,同时抢到了锁。这是靠一个叫 CAS(Compare and Swap) 的硬件指令来保证的。看一下 ReentrantLock 的源码,核心的获取锁的方法是:compareAndSetState(),它调用Java本地方法compareAndSwapInt,而compareAndSwapInt方法内部又是借助C语言来调用底层intel处理器的CAS指令 《cmpxchg》 指令来完成的。先确认一遍State现在是不是0,如果是,那代表目前Lock是可用的,然后马上改变State的状态,获取锁。这一系列动作全包含在一个cmpxchg指令中,执行一个机器码的过程是不可能中间停下来,切换上下文到另一个线程的。这正是可重入锁线程安全的理论保障。
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
... ...
}
抢占锁失败之后的重入策略,和Synchronized独占锁就不同了,不会马上阻塞挂起等待唤醒。而是进入一个等待队列,以期重新获取锁。所以关键就在这个队列上,它叫 AbstractQueuedSynchronizer(简称AQS)。
这个AQS就很重要了,它是JDK1.5提供的一个 基于FIFO等待队列实现的一个用于实现同步器的基础框架。AQS有多重要呢,可以说java.util.concurrent包里面几乎所有的有关锁、多线程并发以及线程同步器等重要组件的实现都是基于AQS这个框架。下图展示了ReentrantLock和这个框架的关系。
AbstractQueuedSynchronizer模板类中定义的主要方法比如tyrAcuire()尝试获取锁,tryRelease()尝试释放锁。而AbstractQueuedSynchronizer本身基于一个FIFO队列。它的核心思想是:避免所有等待获取锁的线程都不停地进行尝试,这样会引发“羊群效应”大量消耗资源,变得低效。改为只让排在队列头部的线程尝试获取锁。这就是ReentrantLock相对高效的原因。具体关于AbstractQueuedSynchronizer的细节,可以参看《扒一扒ReentrantLock以及AQS实现原理》这篇文章。
用ReentrantLock时的一个惯用法是:“上锁/try/finally/解锁”。不管操作是否成功,最后都要解锁。
public void increment(){
lock.lock();
try{
++counter;
}finally{
lock.unlock();
}
}
光有锁并不能保证“线程安全”。还必须给竟态资源加上volatile关键字。volatile是指:保持变量的 “可见性”。什么是可见性?SUN官方定义是这样的:
什么意思?看下图:
Java的内存模型是这样:每个Thread都有一小块“缓存区”,不是之内存,是CPU里的缓存区。如果一个变量的值被改变,可能只是先缓存在这个缓存区,内存上变量的值没有被改写。这就导致对变量操作“可见性”的问题。比如我先改变一个变量值,然后从内存中读取它,可能读取出来的还是原始值。相当于值的改变对我们不可见。
这里的“happens-before relationship(偏序关系)”指的就是,必须保证如果值的改变发生在读取之前,那么这个改变要确确实实写进内存,让读取操作“可见”。
所以“可见性”粗略说就是:每次值的写入都直接写进内存,而不使用CPU缓存的优化。
所以,所有用volatile关键字修饰的变量,他们的“读”,“写”操作都保证是原子性的。 这一定要记住!
所以,线程安全的三个关键词:“互斥性”,“可见性”,“原子性”。
ExecutorService#shutdown():不再接受新任务。
ExecutorService#shutdownNow():立刻终止所有任务。
ExecutorService#awaitTermination():阻塞直到所有现有任务完成,然后结束所有线程,关闭线程池。
这里的主角是阻塞(blocked)状态。有四种情况能进入阻塞状态:
一个线程在运行过程中是可以中断的。我们可以调用Thread.interrupt()方法来实现对线程的中断。或者执行shutdown(),shutdownNow()方法也会调用Thread.interrupt()方法来中断线程。当一个线程在正常运行状态被中断,会抛出一个InterruptedException。
但中断请求本质上只是礼貌地请求另一个线程在它愿意并且方便的情况下停止它正在做的事,并没有强制力。所以当线程处于阻塞状态时,中断的请求并不一定会被理睬。概括来说规则很简单:
当线程不可中断时,还是会将线程的中断状态(interrupted status)设置为true。所以并发编程的一个惯用法就需要在监听InterruptedException的同时,还要轮询线程的中断状态,才能确保程序总是能及时退出。
public void run() {
try {
while (!Thread.currentThread().isInterrupted()){
/* do something */
}
} catch (InterruptedException ie) {
/* print something */
}
}
wait()阻塞挂起当前线程的同时,释放互斥锁。这点和sleep()不同,sleep()不释放互斥锁。
someObject.notifyAll();
someObject.wait();
先唤醒正在等待某个对象互斥锁的所有线程,然后再阻塞挂起当前线程,释放互斥锁,这样做是安全的。
另外wait()的一个惯用法是:尽量把wait()放在一个while(!condition){wait();}里面。防止醒来后却发现不满足条件的情况。
最后,对某个对象调用wait()和notify(),notifyAll()之前先获得这个对象上的互斥锁。
notify()和notifyAll()的区别在于,notifyAll()唤醒所有排队线程,而notify()只唤醒其中一个线程,但却无法控制唤醒的是哪一个。
notifyAll()的策略就是,在这个锁上等的线程都叫醒。由线程自己判断这次的事务是否和自己有关。
notify()只叫醒一个线程,线程也需要自己判断这次的事务是否和自己有关。但notify()和notifyAll()的区别在于,如果任务和被唤醒的线程无关,继续睡之前,此线程还需要把接力棒传下去唤醒另一个线程,虽然它也不清楚唤醒的是哪个线程。
所以一般来说notifyAll()更合理一些。特殊情况用notify()要小心。
这里再次强调interrupt的有效范围:
这是一个交叉模型,无论是生产者还是消费者,都秉持同一个逻辑:
除了wait()和notifyAll()来完成线程间的协作。conditon#await()和conditon#signalAll()也能实现同样的功能。
和wait()以及notifyAll()是附着于Object不同。conditon#await()和conditon#signalAll()是附着于Lock。
这里贴一个Oracle官方的例子:例子里通过两个条件来控制不同线程。
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
后面的练习28也是一个使用condition很好的例子。
无论通过Object#wait(),notify()组合还是condition#await(),signal()组合,这种通过互斥锁握手来实现同步的策略还是有点复杂。
一个更简单的解决方案是BlockingQueue。它的特性主要有两点:
BlockingQueue的阻塞能被中断。
由于BlockingQueue极大地降低了并发场景的复杂度,在能用BlockingQueue解决问题的情况下,应该尽量使用BlockingQueue。
适用于某个任务必须等好几个前置任务完成之后才能执行,那用CountDownLatch把这个任务锁住。每完成一个任务latch就countDown()一下。当latch为零时,这个被锁住的任务就会自动运行。
两个特性:BlockingQueue,Priority。其中DelayQueue的优先级是以delay的长短实现,时间上排在前面的优先级更高。
schedule()延迟执行一次任务 scheduleAtFixedRate()周期性执行任务
计数信号量顾名思义就像一个“计数器”。用来管理有限个数的对象。acquire()申请对象。release()释放对象。当计数器满,也就是对象全部被拿走之后,申请线程会被阻塞。Semaphore本身并不持有对象,它只是个计数器,一个线程安全的计数器。
size代表对象的数量。fair代表是不是一个公平锁,也就是被阻塞的申请线程是不是FIFO排队等对象被释放。
Semaphore(int size, boolean fair)
需要注意,Semaphore是没有对每个对象加synchronized独占锁,因为否则acquire()锁住某个对象后,release()就被阻塞住,无法释放。但Semaphore是线程安全的。因为acquire()和release()两个方法都是原子性的。底层都基于原子性的CAS (compare and swap)机器码。
书里列举了利用Semaphore实现一个“对象池”模型。参见练习33.
Exchanger也是模拟一个“生产者-消费者”的场景。想象这样一个场景:妈妈煎饼,我吃饼。
朴素的“生产者-消费者”模型是:只有一个盘子,只能放5张饼。妈妈煎5张饼,装满盘子叫我。我吃完5张饼叫妈妈。妈妈继续煎饼。
Exchanger的“生产者-消费者”模型的不同在于:有两个盘子,容量可以不一样,比如一号盘能装5张,二号盘装3张。初始我和妈妈分别持有其中一个。妈妈剪完5张饼往一号盘里放,叫我。同时我正在吃二号盘里的三张饼。吃完了了叫妈妈。此时我和妈妈握手,交换盘子。我拿到装满5张饼的一号盘继续吃。妈妈拿到空的二号盘,继续煎饼。具体的煎饼模型,参见练习34.
Exchanger有一个需要注意的地方:当一个线程提出交换的时候,如果它的patener线程已经提出了交换,那当前线程可以直接获得patener线程提供的对象,继续运行。这就是为什么练习34里,两个线程各持有10个盘子,但运行的结果是生产者先生产20个煎饼,然后消费者再吃20个煎饼。
记住下面这些基本结论:
所以关于锁的选择的一个基调应该是:对性能不太敏感的时候,尽量使用synchronized以保持代码的可读性。只有当性能需要严格调优时才考虑替换成Lock。
同步的原理是在写入的时候会先创造一个副本,先在副本上修改,最后通过一个原子性的动作把副本和原本同步。
常规的ReentrantLock的任意读写操作都是互斥的。很好地保护了线程安全。但如果要进一步提高效率的话,读操作其实是不用独占锁的。所以就有了ReentrantReadWriteLock。它的readLock是“共享锁”,writeLock是“独占锁”。所以它允许多个线程同时进行读操作。但每个时刻都只能有一个线程进行写操作。并且读操作和写操作也不能同时进行。
关于ReentrantReadWriteLock的具体操作,参见练习40.
Thread间的切换完全看JVM心情。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Exercise1 implements Runnable{
private static int count=0;
private int id=++count;
private int max=10;
public Exercise1(){System.out.println("Operation NO."+id+" initialized ... ");}
public void run(){
for(int i=0;i<3;i++){
System.out.println("#"+id+"("+(max--)+")");
Thread.yield();
}
System.out.println("Operation NO."+id+" is finished!");
return;
}
public static void main(String[] args){
for(int i=0;i<5;i++){
new Thread(new Exercise1()).start();
}
}
}
运行结果还是看JVM心情。
package com.ciaoshen.thinkinjava.chapter21;
public interface Generator<T>{public T next();}
package com.ciaoshen.thinkinjava.chapter21;
public class Fibonacci implements Generator<Integer>,Runnable{
private int count;
private int index=0;
public Fibonacci(int num){count=num;}
public boolean hasNext(){return count>0;}
public Integer next(){return count-->0? fibo(++index):-1;}
public int fibo(int num){
if(num==1){
return 1;
}
if(num==2){
return 1;
}
return fibo(num-1)+fibo(num-2);
}
public void run(){
while(hasNext()){
System.out.print(next()+" ");
}
System.out.println("");
}
public static void main(String[] args){
Fibonacci f=new Fibonacci(8);
f.run();
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Exercise2 {
public static void main(String[] args){
for(int i=0;i<10;i++){
new Thread(new Fibonacci(i+1)).start();
}
}
}
结果除了最后一个SingleThreadExecutor输出结果是顺序的,其他三个都是乱序。不代表他们都没有能力,只是还没有用其中的控制函数。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Exercise3 implements Runnable, Generator<String>{
private static int count;
private int id=++count;
private int num; //remains how many
public Exercise3(int num){this.num=num;}
public boolean hasNext(){return num>0;}
public String next(){return num>0? "#"+id+"["+(num--)+"] ":"NULL";}
public void run(){
while(hasNext()){
System.out.print(next());
}
}
public static void testDifferentThreadPool(ExecutorService ex){
for(int i=0;i<10;i++){
ex.execute(new Exercise3(i+5));
}
ex.shutdown();
}
public static void main(String[] args){
Exercise3.testDifferentThreadPool(Executors.newCachedThreadPool());
Exercise3.testDifferentThreadPool(Executors.newFixedThreadPool(5));
Exercise3.testDifferentThreadPool(Executors.newScheduledThreadPool(5));
Exercise3.testDifferentThreadPool(Executors.newSingleThreadExecutor());
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Exercise4 {
public static void testDifferentThreadPool(ExecutorService ex){
for(int i=0;i<10;i++){
ex.execute(new Fibonacci(i+5));
}
ex.shutdown();
}
public static void main(String[] args){
Exercise4.testDifferentThreadPool(Executors.newCachedThreadPool());
Exercise4.testDifferentThreadPool(Executors.newFixedThreadPool(5));
Exercise4.testDifferentThreadPool(Executors.newScheduledThreadPool(5));
Exercise4.testDifferentThreadPool(Executors.newSingleThreadExecutor());
}
}
package com.ciaoshen.thinkinjava.chapter21;
public class Fibonacci implements Generator<Integer>,Runnable{
private int count;
private int index=0;
public Fibonacci(int num){count=num;}
public boolean hasNext(){return count>0;}
public Integer next(){return count-->0? fibo(++index):-1;}
public int fibo(int num){
if(num==1){
return 1;
}
if(num==2){
return 1;
}
return fibo(num-1)+fibo(num-2);
}
public void run(){
while(hasNext()){
System.out.print(next()+" ");
}
System.out.println("");
}
public static void main(String[] args){
Fibonacci f=new Fibonacci(8);
f.run();
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.*;
import java.util.concurrent.*;
public class Exercise5 implements Callable<Integer>{
private Fibonacci f;
public Exercise5(int num){
f=new Fibonacci(num);
}
public Integer call(){
int sum=0;
while(f.hasNext()){
sum+=f.next();
}
return sum;
}
public static void main(String[] args) {
ExecutorService ex=Executors.newCachedThreadPool();
try{
List<Future<Integer>> list=new ArrayList<Future<Integer>>();
for(int i=0;i<10;i++){
list.add(ex.submit(new Exercise5(i+1)));
}
for(Future<Integer> f:list){
System.out.println(f.get());
}
}catch(InterruptedException ie){
System.out.println(ie);
}catch(ExecutionException ee){
System.out.println(ee);
}finally{
ex.shutdown();
}
}
}
主线程休眠时间不够的话,守护线程没有时间运行。 但守护进程运行的时间非常不稳定。经常是尽管主线程有足够的休眠时间,也得不到执行。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
class Daemon implements Runnable {
private Thread[] t = new Thread[100];
public void run() {
for(int i = 0; i < t.length; i++) {
t[i] = new Thread(new DaemonSpawn());
t[i].start();
System.out.print("DaemonSpawn " + i + " started, ");
}
for(int i = 0; i < t.length; i++)
System.out.print("t[" + i + "].isDaemon() = " +
t[i].isDaemon() + ", ");
while(true)
Thread.yield();
}
}
class DaemonSpawn implements Runnable {
public void run() {
while(true){
Thread.yield();
}
}
}
public class Exercise7{
public static void main(String[] args) throws Exception {
Thread d = new Thread(new Daemon());
d.setDaemon(true);
d.start();
//System.out.print("d.isDaemon() = " + d.isDaemon() + ", ");
//TimeUnit.NANOSECONDS.sleep(1);
//TimeUnit.MILLISECONDS.sleep(1);
//TimeUnit.MICROSECONDS.sleep(1);
//TimeUnit.SECONDS.sleep(1);
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Exercise8 extends Thread {
private int countDown = 50;
private static int threadCount = 0;
public Exercise8() {
// Store the thread name:
super(Integer.toString(++threadCount));
setDaemon(true);
start();
}
public String toString() {
return "#" + getName() + "(" + countDown + "), ";
}
public void run() {
while(true) {
System.out.print(this);
if(--countDown == 0){
return;
}
}
}
public static void main(String[] args) {
for(int i = 0; i < 10; i++){
Thread th=new Exercise8();
}
/**
* Additional time
*
try{
TimeUnit.NANOSECONDS.sleep(1);
}catch(InterruptedException ie){
System.out.println(ie);
}
*/
}
}
这题有两个坑:
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Exercise9 implements Runnable{
public static class Exercise9ThreadFactory implements ThreadFactory{
private int priority;
public Exercise9ThreadFactory(int lev){priority=lev;}
@Override
public Thread newThread(Runnable r){
Thread th=new Thread(r);
th.setPriority(priority);
return th;
}
}
private int countDown = 5;
private volatile double d; // No optimization
public String toString() {
return Thread.currentThread() + ": " + countDown;
}
public void run() {
while(true) {
// An expensive, interruptable operation:
for(int i = 1; i < 100000000; i++) {
d += (Math.PI + Math.E) / (double)i;
if(i % 1000 == 0)
Thread.yield();
}
System.out.println(this);
if(--countDown == 0) return;
}
}
public static void main(String[] args) {
ExecutorService exLow = Executors.newCachedThreadPool(new Exercise9.Exercise9ThreadFactory(Thread.MIN_PRIORITY));
ExecutorService exHigh = Executors.newCachedThreadPool(new Exercise9.Exercise9ThreadFactory(Thread.MAX_PRIORITY));
for(int i = 0; i < 5; i++){
exLow.execute(new Exercise9());
}
exHigh.execute(new Exercise9());
exLow.shutdown();
exHigh.shutdown();
}
}
import java.util.concurrent.*;
public class Exercise10{
//Executor
private ExecutorService es=Executors.newCachedThreadPool();
private Fibonacci f=new Fibonacci();
//inner Callable
public Future runTask(int num){
return es.submit(new Callable<Integer>(){
public Integer call(){
Integer result=0;
int index=0;
while(++index<=num){
result+=f.fibo(index);
}
return result;
}
});
}
public void close(){es.shutdown();}
public static void main(String[] args){
Exercise10 ex=new Exercise10();
Future f=ex.runTask(10);
ex.close();
try{
System.out.println(f.get());
}catch(InterruptedException ie){
System.out.println(ie);
}catch(ExecutionException ee){
System.out.println(ee);
}
}
}
例子的两个字段存放Fibonacci数列的前两个数字。fiboNext()方法动态将Fibonacci数列往前推进。show()方法显示当前两个基本数字之和。可以在有synchronized锁,或者无synchronized锁之间自由切换。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Exercise11 implements Runnable{
public static class ThreadFactory11 implements ThreadFactory{
private int count=0;
public Thread newThread(Runnable r){
return new Thread(r,String.valueOf(++count));
}
}
private volatile int base1=1;
private volatile int base2=1;
//public void fiboNext(){
public synchronized void fiboNext(){
int sum=base1+base2;
base1=base2;
Thread.yield();
base2=sum;
}
//public void show(){
public synchronized void show(){
System.out.println("Thread #"+Thread.currentThread().getName()+" >>> "+String.valueOf(base1)+"+"+String.valueOf(base2)+"="+String.valueOf(base1+base2));
}
//public void run(){
public synchronized void run(){
for(int i=0;i<5;i++){
fiboNext();
show();
}
}
public static void main(String[] args){
Runnable r=new Exercise11();
ExecutorService es=Executors.newCachedThreadPool(new ThreadFactory11());
for(int i=0;i<5;i++){
es.execute(r);
}
es.shutdown();
}
}
无synchronized锁情况下的输出:不但Fibonacci数列的结果不对,而且还有重复计算的情况,而且显示的顺序也乱序。
Thread #4 >>> 5+8=13
Thread #2 >>> 3+5=8
Thread #1 >>> 3+5=8
Thread #5 >>> 5+10=15
Thread #1 >>> 25+40=80
Thread #5 >>> 40+65=105
Thread #3 >>> 3+5=8
Thread #1 >>> 65+105=170
Thread #3 >>> 105+210=315
Thread #2 >>> 15+25=40
Thread #4 >>> 10+15=25
Thread #2 >>> 525+840=1365
Thread #3 >>> 315+525=840
Thread #1 >>> 210+315=525
Thread #3 >>> 2205+3570=5775
Thread #5 >>> 105+170=275
Thread #3 >>> 5775+9345=15120
Thread #1 >>> 3570+5775=9345
Thread #2 >>> 1365+2205=4410
Thread #2 >>> 15120+24465=39585
Thread #4 >>> 840+1365=2205
Thread #5 >>> 9345+15120=24465
Thread #4 >>> 24465+39585=64050
Thread #5 >>> 39585+64050=103635
Thread #4 >>> 64050+103635=167685
有了synchronized锁之后,一切正常。不管有多少个线程一起工作,Fibonacci数列都正确计算,按顺序输出:
Thread #1 >>> 1+2=3
Thread #1 >>> 2+3=5
Thread #1 >>> 3+5=8
Thread #1 >>> 5+8=13
Thread #1 >>> 8+13=21
Thread #5 >>> 13+21=34
Thread #5 >>> 21+34=55
Thread #5 >>> 34+55=89
Thread #5 >>> 55+89=144
Thread #5 >>> 89+144=233
Thread #4 >>> 144+233=377
Thread #4 >>> 233+377=610
Thread #4 >>> 377+610=987
Thread #4 >>> 610+987=1597
Thread #4 >>> 987+1597=2584
Thread #3 >>> 1597+2584=4181
Thread #3 >>> 2584+4181=6765
Thread #3 >>> 4181+6765=10946
Thread #3 >>> 6765+10946=17711
Thread #3 >>> 10946+17711=28657
Thread #2 >>> 17711+28657=46368
Thread #2 >>> 28657+46368=75025
Thread #2 >>> 46368+75025=121393
Thread #2 >>> 75025+121393=196418
Thread #2 >>> 121393+196418=317811
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Exercise12 implements Runnable{
private volatile int i=0;
private int duration;
public Exercise12(int duration){
this.duration=duration;
}
public synchronized void f3(){
i++;i++;i++;
}
public void run(){
long stop=System.currentTimeMillis()+duration;
while(System.currentTimeMillis() < stop){
f3();
}
Thread.yield();
}
public int getValue(){return i;}
public class Checker implements Runnable{
public void check(){
int value;
synchronized(Exercise12.this){
value=getValue();
}
if(value%3!=0){
System.out.println(value);
}
}
public void run(){
while(true){
check();
Thread.yield();
}
}
}
public static void main(String[] args){
ExecutorService mainService=Executors.newCachedThreadPool();
ExecutorService daemonService=Executors.newCachedThreadPool(new ThreadFactory(){
public Thread newThread(Runnable r){
Thread th=new Thread(r);
th.setDaemon(true);
return th;
}
});
Exercise12 task=new Exercise12(1000);
Checker checker=task.new Checker();
mainService.execute(task);
daemonService.execute(checker);
mainService.shutdown();
daemonService.shutdown();
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Exercise14{
public static void action(){
for(int i=0;i<1000;i++){
new Timer().schedule(new TimerTask(){
public void run(){
for(int j=5;j>0;j--){
System.out.println("#"+Thread.currentThread().getName()+ " >>> " +j);
try{
Thread.sleep(1000);
}catch(InterruptedException ie){
System.out.println("Sleep Interrupted!");
}
}
}
},6000);
}
}
public static void main(String[] args){
Exercise14.action();
}
}
取消注释,换成synchronized(this),就变成对三个不同的对象加同步锁。同步就被破坏。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Exercise15 implements Runnable{
private volatile String str;
public Exercise15(String name){str=name;}
public class Aaa implements Runnable{
public void run(){
//synchronized (this){
synchronized (Exercise15.this){
str=str.substring(0,4);
str+="AAA";
}
}
}
public class Bbb implements Runnable{
public void run(){
//synchronized (this){
synchronized (Exercise15.this){
str=str.substring(0,4);
str+="BBB";
}
}
}
public class Ccc implements Runnable{
public void run(){
//synchronized (this){
synchronized (Exercise15.this){
str=str.substring(0,4);
str+="CCC";
}
}
}
public void run(){
synchronized(this){
System.out.println(str);
}
}
public static void main(String[] args){
ExecutorService executor=Executors.newCachedThreadPool();
Random rand=new Random();
int x=0;
Exercise15 theMain=new Exercise15("STR-");
Runnable aaa=theMain.new Aaa(), bbb=theMain.new Bbb(), ccc=theMain.new Ccc();
for(int i=0;i<100;i++){
x=rand.nextInt(3);
switch(x){
case 0:
executor.execute(aaa); break;
case 1:
executor.execute(bbb); break;
case 2:
executor.execute(ccc); break;
}
executor.execute(theMain);
}
executor.shutdown();
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
import java.util.concurrent.locks.*;
public class Exercise16 implements Runnable{
private volatile String str;
public Exercise16(String name){str=name;}
private Lock lock=new ReentrantLock();
public class Aaa implements Runnable{
public void run(){
lock.lock();
try{
str=str.substring(0,4);
str+="AAA";
}finally{
lock.unlock();
}
}
}
public class Bbb implements Runnable{
public void run(){
lock.lock();
try{
str=str.substring(0,4);
str+="BBB";
}finally{
lock.unlock();
}
}
}
public class Ccc implements Runnable{
public void run(){
lock.lock();
try{
str=str.substring(0,4);
str+="CCC";
}finally{
lock.unlock();
}
}
}
public void run(){
lock.lock();
try{
System.out.println(str);
}finally{
lock.unlock();
}
}
public static void main(String[] args){
ExecutorService executor=Executors.newCachedThreadPool();
Random rand=new Random();
int x=0;
Exercise16 theMain=new Exercise16("STR-");
Runnable aaa=theMain.new Aaa(), bbb=theMain.new Bbb(), ccc=theMain.new Ccc();
for(int i=0;i<100;i++){
x=rand.nextInt(3);
switch(x){
case 0:
executor.execute(aaa); break;
case 1:
executor.execute(bbb); break;
case 2:
executor.execute(ccc); break;
}
executor.execute(theMain);
}
executor.shutdown();
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Exercise17{
private static int macro=0;
//count
private volatile int count=0;
private boolean canceled=false;
private Random rand=new Random();
private List<Detector> list=new ArrayList<Detector>();
//increment
public synchronized void increment(){count++;}
//show count
public synchronized int getCount(){return count;}
//cancel
public void cancel(){canceled=true;}
//Runnable inner class: each sub counter
public class Detector implements Runnable{
private int id=++macro;
private int number=0;
public Detector(){list.add(this);}
public void run(){
int seed;
while(!canceled){
seed=rand.nextInt(10);
switch(seed){
case 0: case 1: case 2:
increment(); number++; break;
default: break;
}
}
}
public int getNumber(){return number;}
public int getId(){return id;}
}
public class Listener implements Runnable{
public void run(){
while(!canceled){
for(Detector d:list){
System.out.println("Detector #"+d.getId()+": "+d.getNumber()+" Total: "+getCount());
}
}
}
}
//main
public static void main(String[] args){
ExecutorService es=Executors.newCachedThreadPool();
Exercise17 rc=new Exercise17();
for(int i=0;i<10;i++){
es.execute(rc.new Detector());
}
es.execute(rc.new Listener());
try{
TimeUnit.MILLISECONDS.sleep(3000);
rc.cancel();
es.shutdown();
if(!es.awaitTermination(250,TimeUnit.MILLISECONDS)){
System.out.println("Some thread not terminated!");
}
}catch(InterruptedException ie){
System.out.println("Termination Interrupted!");
}finally{
rc.cancel();
es.shutdown();
}
}
}
这题很好地演示了面对interrupt()只能在sleep()的情况下中断线程的情况下,怎么利用interrupted()判断来迫使程序在非阻塞状态下也能正常中断。这是很好的一种惯用法(良好实践)。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Exercise18{
public Exercise18(){
try{
System.out.println("I am awake!");
long begin=System.currentTimeMillis()+1000;
while(!Thread.interrupted()){
if(System.currentTimeMillis()>=begin){
System.out.println("Sleeping...");
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println("I am awake!");
begin=System.currentTimeMillis()+1000;
}
}
System.out.println("Interrupted in daytime!");
}catch(InterruptedException ie){
System.out.println("Interrupted while sleeping!");
}
}
public static class GotoSleep implements Runnable{
public void run(){
new Sleep();
}
}
public static void main(String[] args){
ExecutorService es=Executors.newCachedThreadPool();
Future<?> f=es.submit(new Exercise18.GotoSleep());
es.shutdown();
try{
TimeUnit.MILLISECONDS.sleep((long)(new Random().nextInt(10000)));
}catch(InterruptedException ie){
System.out.println("Test Interrupted!");
}finally{
f.cancel(true);
}
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.*;
public class Count {
private int count = 0;
private Random rand = new Random(47);
// Remove the synchronized keyword to see counting fail:
public synchronized int increment() {
int temp = count;
if(rand.nextBoolean()) // Yield half the time
Thread.yield();
return (count = ++temp);
}
public synchronized int value() { return count; }
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Entrance implements Runnable {
private static Count count = new Count();
private static List<Entrance> entrances =
new ArrayList<Entrance>();
private int number = 0;
// Doesn’t need synchronization to read:
private final int id;
public Entrance(int id) {
this.id = id;
// Keep this task in a list. Also prevents
// garbage collection of dead tasks:
entrances.add(this);
}
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
++number;
}
System.out.println(this + " Total: " + count.increment());
TimeUnit.MILLISECONDS.sleep(100);
}
} catch(InterruptedException e) {
}finally{
System.out.println(this+" interrupted!");
}
}
public synchronized int getValue() { return number; }
public String toString() {
return "Entrance " + id + ": " + getValue();
}
public static int getTotalCount() {
return count.value();
}
public static int sumEntrances() {
int sum = 0;
for(Entrance entrance : entrances)
sum += entrance.getValue();
return sum;
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Exercise19 {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++){
exec.execute(new Entrance(i));
}
// Run for a while, then stop and collect the data:
TimeUnit.SECONDS.sleep(3);
exec.shutdownNow();
if(!exec.awaitTermination(250, TimeUnit.MILLISECONDS)){
System.out.println("Some tasks were not terminated!");
}
System.out.println("Total: " + Entrance.getTotalCount());
System.out.println("Sum of Entrances: " + Entrance.sumEntrances());
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class LiftOff implements Runnable {
protected int countDown = 10; // Default
private static int taskCount = 0;
private final int id = taskCount++;
private long begin,end;
public LiftOff() {}
public LiftOff(int countDown) {
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), ";
}
public void run() {
begin=System.currentTimeMillis();
try{
while(!Thread.interrupted() && countDown-- > 0) {
System.out.println(status());
TimeUnit.MILLISECONDS.sleep(100);
}
}catch(InterruptedException ie){}finally{
System.out.println("Thread#"+Thread.currentThread().getName()+" --> Task#["+id+"] recieve the Interruption signal after "+((System.currentTimeMillis()-begin)/(float)1000)+" seconds!");
}
}
}
这题的坑在于对InterruptedException信号的处理。不要在实际业务逻辑函数randomWord()和format()里过早拦截和处理InterruptedException,否则就永远跳不出Runnable模块里的while(!Thread.interrupted())的循环。
这里有必要再次强调interrupt的有效范围:
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Exercise21{
private Random rand=new Random();
private char[] word=new char[rand.nextInt(15)+1];
private boolean prepared=false;
private boolean formatted=false;
public void reset(){
word=new char[rand.nextInt(15)+1];
prepared=false;
formatted=false;
}
public void preTest(){
try{
new PrepareWord().randomWord();
new FormatWord().format();
}catch(InterruptedException ie){
System.out.println("Task interrupted!");
}
}
public void start(){
ExecutorService es=Executors.newCachedThreadPool();
es.execute(new PrepareWord());
es.execute(new FormatWord());
try{
TimeUnit.SECONDS.sleep(20);
}catch(InterruptedException ie){
System.out.println("Test interrupted incorrectly!");
}finally{
es.shutdownNow();
}
}
public class PrepareWord implements Runnable {
public void randomWord() throws InterruptedException{ //这里不处理interrupt信号很重要,抛出去让Runnable处理
for(int i=0;i<word.length;i++){
char c=(char)(((int)'a')+rand.nextInt(26));
word[i]=c;
System.out.print(c);
TimeUnit.MILLISECONDS.sleep(200);
}
System.out.println("");
prepared=true;
}
public void run(){
synchronized(Exercise21.this){
try{
while(!Thread.interrupted()){
randomWord();
Exercise21.this.notifyAll();
while(!formatted){
Exercise21.this.wait();
}
System.out.println(">>> String 《"+new String(word)+"》constructed!");
reset();
}
}catch(InterruptedException ie){
System.out.println("Word generation interrupted!");
}
System.out.println("Word Generator exit correctly!");
}
}
}
public class FormatWord implements Runnable{
public void format() throws InterruptedException{ //这里不处理interrupt信号很重要,抛出去让Runnable处理
for(int i=0;i<word.length;i++){
if(i%2==0){
word[i]=Character.toUpperCase(word[i]);
}
System.out.print(word[i]);
TimeUnit.MILLISECONDS.sleep(200);
}
System.out.println("");
formatted=true;
}
public void run(){
synchronized(Exercise21.this){
try{
while(!Thread.interrupted()){
while(!prepared || formatted){
Exercise21.this.wait();
}
format();
Exercise21.this.notifyAll();
}
}catch(InterruptedException ie){
System.out.println("Word format interrupted!");
}
System.out.println("Word Decorator exit correctly!");
}
}
}
public static void main(String[] args){
Exercise21 ex=new Exercise21();
//ex.preTest();
ex.start();
}
}
光做true or false太无聊,稍微改动为地球人和外星人的大战。标志改成int,表示地球陷落程度。故事模型是这样:
外星人远古人的战争,就是处于忙等状态,两个线程由JVM控制交替运行。while(!Thread.interrupted())一直在循环。 外星人和现代人的战争,虽然外星人的每次进攻都会调用notifyAll()给人类警醒,但人类的设定是不到家园彻底沦陷,不会反击。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Exercise22{
private Random rand=new Random();
private volatile int earthFallen=0;
public synchronized void save() throws InterruptedException {TimeUnit.MILLISECONDS.sleep(10);earthFallen--;}
public synchronized void fallen() throws InterruptedException {TimeUnit.MILLISECONDS.sleep(10);earthFallen++;}
public synchronized void show(){
int result=Math.min(earthFallen,100)/5;
for(int y=0;y<20-result;y++){
System.out.print("+");
}
for(int x=0;x<result;x++){
System.out.print("-");
}
System.out.println("");
}
public class PrimitiveAlien implements Runnable{
public void attack(int times) throws InterruptedException{
synchronized(Exercise22.this){
for(int i=0;i < times;i++){
fallen();
}
show();
}
}
public void run(){
try{
while(!Thread.interrupted()){
if(earthFallen<100){
attack(rand.nextInt(10));
}else{
attack(rand.nextInt(3));
}
}
}catch(InterruptedException ie){
System.out.println("Alien interrupted!");
}
System.out.println("TimeOver! We Alien "+ (earthFallen>=100? "WIN!":"LOSE!"));
}
}
public class Alien implements Runnable{
public void attack(int times) throws InterruptedException{
synchronized(Exercise22.this){
for(int i=0;i<times;i++){
fallen();
}
show();
}
}
public void run(){
try{
while(!Thread.interrupted()){
synchronized(Exercise22.this){
if(earthFallen<100){
attack(rand.nextInt(10));
}else{
attack(rand.nextInt(3));
}
Exercise22.this.notifyAll();
Exercise22.this.wait();
}
}
}catch(InterruptedException ie){
System.out.println("Alien interrupted!");
}
System.out.println("TimeOver! We Alien "+ (earthFallen>=100? "WIN!":"LOSE!"));
}
}
public class Primitive implements Runnable{
public void defense(int times) throws InterruptedException{
synchronized(Exercise22.this){
for(int i=0;i<times;i++){
save();
}
show();
}
}
public void run(){
try{
while(!Thread.interrupted()){
if(earthFallen<80){
defense(rand.nextInt(3));
}else if(earthFallen<100){
defense(rand.nextInt(8));
}else{
defense(rand.nextInt(10));
}
}
}catch(InterruptedException ie){
System.out.println("Primitive Interrupted!");
}
System.out.println("TimeOver! We Primitive "+ (earthFallen>=100? "LOSE!":"WIN!"));
}
}
public class Human implements Runnable{
public void defense(int times) throws InterruptedException{
synchronized(Exercise22.this){
for(int i=0;i<times;i++){
save();
}
show();
}
}
public void run(){
try{
while(!Thread.interrupted()){
synchronized(Exercise22.this){
while(earthFallen<100){
Exercise22.this.notifyAll();
Exercise22.this.wait();
}
defense(rand.nextInt(100));
}
}
}catch(InterruptedException ie){
System.out.println("Human Interrupted!");
}
System.out.println("TimeOver! We Human "+(earthFallen>100? "LOSE!":"WIN!"));
}
}
public static void main(String[] args){
Exercise22 ex=new Exercise22();
ExecutorService es=Executors.newCachedThreadPool();
//es.execute(ex.new PrimitiveAlien());
//es.execute(ex.new Primitive());
es.execute(ex.new Alien());
es.execute(ex.new Human());
try{
TimeUnit.SECONDS.sleep(new Random().nextInt(15)+1);
}catch(InterruptedException ie){
System.out.println("Test Interrupted!");
}finally{
es.shutdownNow();
}
}
}
下面的程序,就算是好几个WaxOn任务和WaxOff任务一起跑,也会正常上蜡,除蜡循环操作。
使用notify()替代notifyAll()的关键在于既然不能控制被唤醒的是不是恰当的任务,一个任务收到唤醒信号后,需要判断是不是符合醒来的条件,否则继续等待。而且,继续睡或者完成任务之前,还需要把接力棒传下去唤醒另一个任务,虽然它也不清楚唤醒的是哪个任务。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Car {
private boolean waxOn = false;
public synchronized void wax() throws InterruptedException {
waitForBuffing();
System.out.print("Wax On! ");
TimeUnit.MILLISECONDS.sleep(200);
waxOn = true; // Ready to buff
}
public synchronized void buff() throws InterruptedException {
waitForWaxing();
System.out.print("Wax Off! ");
TimeUnit.MILLISECONDS.sleep(200);
waxOn = false; // Ready to wax
}
public synchronized void waitForWaxing() throws InterruptedException {
while(waxOn == false){
notify();
wait();
}
}
public synchronized void waitForBuffing() throws InterruptedException {
while(waxOn == true){
notify();
wait();
}
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class WaxOn implements Runnable {
private Car car;
public WaxOn(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) {
car.wax();
}
} catch(InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax On task");
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class WaxOff implements Runnable {
private Car car;
public WaxOff(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) {
car.buff();
}
} catch(InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax Off task");
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Exercise23 {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5); // Run for a while...
exec.shutdownNow(); // Interrupt all tasks
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Exercise24{
private volatile Queue<String> table=new LinkedList<String>();
private Random rand=new Random();
private volatile Fourniseur fourniseur=new Fourniseur();
private volatile Consumer consumer=new Consumer();
public String randomFood(){
StringBuilder sb=new StringBuilder();
for(int i=0;i<rand.nextInt(15)+1;i++){
sb.append((char)(((int)'a')+rand.nextInt(26)));
}
return sb.toString();
}
public void show() throws InterruptedException{
synchronized(table){
Iterator ite=table.iterator();
while(ite.hasNext()){
System.out.print(((String)ite.next()).substring(0,1));
TimeUnit.MILLISECONDS.sleep(10);
}
for(int j=0;j<10-table.size();j++){
System.out.print("-");
TimeUnit.MILLISECONDS.sleep(10);
}
System.out.println("");
}
}
public class Fourniseur implements Runnable{
public void run(){
try{
while(!Thread.interrupted()){
//让出生产锁,有7道菜就不上新菜
synchronized(this){
while(table.size()>7){
wait();
}
}
//占住消费锁,上一轮菜
System.out.println("Here come the new plate!");
synchronized(Exercise24.this.consumer){
for(int i=0;i<rand.nextInt(3)+1;i++){
synchronized(Exercise24.this.table){
table.add(randomFood());
}
}
show();
Exercise24.this.consumer.notifyAll();
}
}
}catch(InterruptedException ie){
System.out.println("Fourniseur interrupted!");
}
System.out.println("Fourniseur exit!");
}
}
public class Consumer implements Runnable{
public void run(){
try{
while(!Thread.interrupted()){
//让出消费锁,少于2道菜就等上菜
synchronized(this){
while(table.size()<2){
wait();
}
}
//占住生产锁,吃一轮
synchronized(Exercise24.this.fourniseur){
System.out.println("I can eat now!");
for(int i=0;i<rand.nextInt(2)+1;i++){
synchronized(Exercise24.this.table){
table.poll();
}
}
show();
Exercise24.this.fourniseur.notifyAll();
}
}
}catch(InterruptedException ie){
System.out.println("Fourniseur interrupted!");
}
System.out.println("Fourniseur exit!");
}
}
public static void main(String[] args){
Exercise24 ex=new Exercise24();
ExecutorService es=Executors.newCachedThreadPool();
es.execute(ex.fourniseur);
es.execute(ex.consumer);
try{
TimeUnit.SECONDS.sleep(5);
}catch(InterruptedException ie){
System.out.println("Test Interrupted!");
}finally{
es.shutdownNow();
}
}
}
我相当于只是去掉了chef循环中的sleep()。这样就不会每次都从try{}catch(){}里退出。
package com.ciaoshen.thinkinjava.chapter21;
public class Meal {
private final int orderNum;
public Meal(int orderNum) { this.orderNum = orderNum; }
public String toString() { return "Meal " + orderNum; }
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class WaitPerson implements Runnable {
private Exercise25 restaurant;
public WaitPerson(Exercise25 r) { restaurant = r; }
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
while(restaurant.meal == null)
wait(); // ... for the chef to produce a meal
}
System.out.println("Waitperson got " + restaurant.meal);
synchronized(restaurant.chef) {
restaurant.meal = null;
restaurant.chef.notifyAll(); // Ready for another
}
}
} catch(InterruptedException e) {
System.out.println("WaitPerson interrupted");
}
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Chef implements Runnable {
private Exercise25 restaurant;
private int count = 0;
public Chef(Exercise25 r) { restaurant = r; }
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
while(restaurant.meal != null)
wait(); // ... for the meal to be taken
}
if(++count == 10) {
System.out.println("Out of food, closing");
restaurant.exec.shutdownNow();
}
System.out.print("Order up! ");
synchronized(restaurant.waitPerson) {
restaurant.meal = new Meal(count);
restaurant.waitPerson.notifyAll();
}
//TimeUnit.MILLISECONDS.sleep(100);
}
} catch(InterruptedException e) {
System.out.println("Chef interrupted");
}
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Exercise25 {
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
WaitPerson waitPerson = new WaitPerson(this);
Chef chef = new Chef(this);
public Exercise25() {
exec.execute(chef);
exec.execute(waitPerson);
}
public static void main(String[] args) {
new Exercise25();
}
}
思路就是:
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Exercise26{
private int count=10;
private boolean tableClean=false;
private Meal meal=null;
private WaitPerson waitPerson;
private Chef chef;
private BusBoy busBoy;
private ExecutorService es;
public Exercise26(int count){
this.count=count;
es=Executors.newCachedThreadPool();
waitPerson=new WaitPerson();
chef=new Chef();
busBoy=new BusBoy();
es.execute(waitPerson);
es.execute(chef);
es.execute(busBoy);
}
//Meal
public class Meal{
private int id;
public Meal(int num){id=num;}
public String toString(){return "Meal "+id;}
}
//WaitPerson
public class WaitPerson implements Runnable{
public void run(){
try{
while(!Thread.interrupted()){
TimeUnit.MILLISECONDS.sleep(500);
//wait on self lock
synchronized(this){
while(meal==null){
wait();
}
}
//deliver the food on Chef lock
synchronized(Exercise26.this.chef){
System.out.println("Here is the food!");
//call busboy
synchronized(Exercise26.this.busBoy){
System.out.println("BusBoy, clean the table! ");
busBoy.notifyAll();
}
synchronized(this){
while(!tableClean){
wait();
}
}
System.out.println("Food served! ");
meal=null;
tableClean=false;
chef.notifyAll();
}
}
}catch(InterruptedException ie){
System.out.println("WaitPerson interrupted!");
}
}
}
//Chef
public class Chef implements Runnable{
private int count=0;
public void run(){
try{
while(!Thread.interrupted()){
//wait on self lock
synchronized(this){
while(meal!=null){
wait();
}
}
System.out.println("Order come!");
if(++count>Exercise26.this.count){
System.out.println("Out of meal!");
Exercise26.this.es.shutdownNow();
TimeUnit.MILLISECONDS.sleep(10);
}
//prepare food on WaitPerson lock
synchronized(Exercise26.this.waitPerson){
meal=new Meal(count);
System.out.println(meal+" prepared!");
waitPerson.notifyAll();
}
}
}catch(InterruptedException ie){
System.out.println("Chef interrupted!");
}
}
}
//BusBoy
public class BusBoy implements Runnable{
public void run(){
try{
while(!Thread.interrupted()){
//wait on self lock
synchronized(this){
wait();
}
//clean the table on the WaitPerson lock
synchronized(Exercise26.this.waitPerson){
tableClean=true;
System.out.println("Table cleaned!");
waitPerson.notifyAll();
}
}
}catch(InterruptedException ie){
System.out.println("BusBoy interrupted!");
}
}
}
public static void main(String[] args){
new Exercise26(10);
}
}
这题里BlockingQueue有个很大的坑:就是BlockingQueue的put()或者take()如果取不到元素是阻塞的。一定要做好中断情况下,退出循环,结束线程的准备。下面程序中要是没有那个break,当某一次BlockingQueue的put()被终止后,马上会尝试下一次,然后继续阻塞。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.io.*;
public class Exercise28 {
private static BlockingQueue<LiftOff28> rockets;
public static class LiftOffRunner implements Runnable {
public LiftOffRunner(BlockingQueue<LiftOff28> queue) {
rockets = queue;
}
public void run() {
try {
while(!Thread.interrupted()) {
LiftOff28 rocket = rockets.take();
rocket.run(); // Use this thread
}
} catch(InterruptedException e) {
System.out.println("Waking from take()");
}
System.out.println(Thread.currentThread()+"Exiting LiftOffRunner");
}
}
public static class LiftOffFiller implements Runnable{
private int times=0;
public LiftOffFiller(int num){times=num;}
public void add(LiftOff28 lo) {
try {
rockets.put(lo);
} catch(InterruptedException e) {
System.out.println("Interrupted during put()");
Thread.currentThread().interrupt();
}
}
public void run(){
for(int i=0;i<times;i++){
if(Thread.interrupted()){
break; //保证当interrupt时,跳出循环。否则下一次BlockingQueue的put()会继续阻塞,线程无法退出。
}
add(new LiftOff28(5));
}
System.out.println(Thread.currentThread()+"Exiting LiftOffFiller!");
}
}
static void getkey() {
try {
new BufferedReader(new InputStreamReader(System.in)).readLine();
} catch(java.io.IOException e) {
throw new RuntimeException(e);
}
}
static void getkey(String message) {
System.out.println(message);
getkey();
}
static void test(String msg, BlockingQueue<LiftOff28> queue) throws InterruptedException{
System.out.println(msg);
LiftOffRunner runner = new LiftOffRunner(queue);
Thread t = new Thread(runner);
t.start();
Thread f=new Thread(new LiftOffFiller(5));
f.start();
getkey("Press ‘Enter’ (" + msg + ")");
t.interrupt();
f.interrupt();
System.out.println("Finished " + msg + " test");
}
public static void main(String[] args) throws InterruptedException{
test("LinkedBlockingQueue", new LinkedBlockingQueue<LiftOff28>());
test("ArrayBlockingQueue", new ArrayBlockingQueue<LiftOff28>(3));
test("SynchronousQueue", new SynchronousQueue<LiftOff28>());
}
}
总体思路很简单:
吐司有四种状态,什么都没涂,只涂了黄油,只涂了果酱,黄油果酱都涂过了。注意黄油果酱的顺序不重要,只要都涂过就可以吃。另外涂过的不可以重复涂。所以peanutButter()和jamme()两个方法都用switch根据不同状态处理。相当于有限状态机。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Toast29{
public static enum Status29{DRY, PBD, JMD, FINISH}
private static int count=0;
private int id;
private Status29 status=Status29.DRY;
public Toast29(){id=++count;}
public synchronized void peanutButter(){
switch(status){
case DRY:
status=Status29.PBD; break;
case PBD:
System.out.println("ERROR: "+this+" alread PeanutButtered!!!");break;
case JMD:
status=Status29.FINISH;break;
case FINISH:
System.out.println("ERROR: "+this+" alread finished!!!");break;
}
}
public synchronized void jamme(){
switch(status){
case DRY:
status=Status29.JMD; break;
case JMD:
System.out.println("ERROR: "+this+" alread Jammed!!!");break;
case PBD:
status=Status29.FINISH;break;
case FINISH:
System.out.println("ERROR: "+this+" alread finished!!!");break;
}
}
public Status29 getStatus(){return status;}
public void setStatus(Status29 s){status=s;}
public String toString(){return "Toast#"+id+": "+status;}
}
BlockingQueue换个名字。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class ToastQueue29 extends LinkedBlockingQueue<Toast29>{}
有两个这样专门生产原味吐司的线程,分别插进涂黄油队列和涂果酱队列。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Toaster29 implements Runnable{
private ToastQueue29 toastQueue;
private Random rand=new Random();
public Toaster29(ToastQueue29 tq){toastQueue=tq;}
public void run(){
try{
while(!Thread.interrupted()){
TimeUnit.MILLISECONDS.sleep(100+rand.nextInt(100));
Toast29 t=new Toast29();
System.out.println(t);
toastQueue.put(t);
}
}catch(InterruptedException ie){
System.out.println("Toaster29 interrupted!");
}finally{
System.out.println("Toaster29 exit!");
}
}
}
负责涂黄油的线程。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class PeanutButterer29 implements Runnable{
private ToastQueue29 toastQueue;
private ToastQueue29 peanutButterQueue;
public PeanutButterer29(ToastQueue29 tq, ToastQueue29 pbq){
toastQueue=tq;
peanutButterQueue=pbq;
}
public void run(){
try{
while(!Thread.interrupted()){
Toast29 t=toastQueue.take();
t.peanutButter();
System.out.println(t);
peanutButterQueue.put(t);
}
}catch(InterruptedException ie){
System.out.println("Toaster29 interrupted!");
}finally{
System.out.println("Toaster29 exit!");
}
}
}
负责涂果酱的线程。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Jammer29 implements Runnable{
private ToastQueue29 toastQueue;
private ToastQueue29 jammeQueue;
public Jammer29(ToastQueue29 tq, ToastQueue29 jmq){
toastQueue=tq;
jammeQueue=jmq;
}
public void run(){
try{
while(!Thread.interrupted()){
Toast29 t=toastQueue.take();
t.jamme();
System.out.println(t);
jammeQueue.put(t);
}
}catch(InterruptedException ie){
System.out.println("Toaster29 interrupted!");
}finally{
System.out.println("Toaster29 exit!");
}
}
}
从涂完黄油的队列里挑拣已经FINISH的和只涂了黄油的。只涂了黄油的再插入涂果酱流水线。FINISH的就等待被吃。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class PBChecker29 implements Runnable{
private ToastQueue29 peanutButteredQueue;
private ToastQueue29 jammeQueue;
private ToastQueue29 finishQueue;
public PBChecker29(ToastQueue29 pbdq, ToastQueue29 jmq, ToastQueue29 fq){
peanutButteredQueue=pbdq;
jammeQueue=jmq;
finishQueue=fq;
}
public void run(){
try{
while(!Thread.interrupted()){
Toast29 t=peanutButteredQueue.take();
switch(t.getStatus()){
case DRY:
System.out.println("ERROR: "+t+" still DRY, cannot PeanutButter!!!");break;
case PBD:
jammeQueue.put(t);break;
case JMD:
System.out.println("ERROR: "+t+" need to be PeanutButtered!!!");break;
case FINISH:
finishQueue.put(t);break;
}
}
}catch(InterruptedException ie){
System.out.println("Toaster29 interrupted!");
}finally{
System.out.println("Toaster29 exit!");
}
}
}
同理,从涂完果酱的队列里挑拣已经FINISH的和只涂了果酱的。只涂了果酱的再插入涂黄油流水线。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class JMChecker29 implements Runnable{
private ToastQueue29 jammedQueue;
private ToastQueue29 peanutButterQueue;
private ToastQueue29 finishQueue;
public JMChecker29(ToastQueue29 jmdq, ToastQueue29 pbq, ToastQueue29 fq){
jammedQueue=jmdq;
peanutButterQueue=pbq;
finishQueue=fq;
}
public void run(){
try{
while(!Thread.interrupted()){
Toast29 t=jammedQueue.take();
switch(t.getStatus()){
case DRY:
System.out.println("ERROR: "+t+" still DRY, cannot Jamme!!!");break;
case PBD:
System.out.println("ERROR: "+t+" need to be Jammed!!!");break;
case JMD:
peanutButterQueue.put(t);break;
case FINISH:
finishQueue.put(t);break;
}
}
}catch(InterruptedException ie){
System.out.println("Toaster29 interrupted!");
}finally{
System.out.println("Toaster29 exit!");
}
}
}
专门负责从成品吐司队列里拿来吃的线程。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Eater29 implements Runnable{
private ToastQueue29 finishQueue;
public Eater29(ToastQueue29 fq){
finishQueue=fq;
}
public void run(){
try{
while(!Thread.interrupted()){
Toast29 t=finishQueue.take();
if(t.getStatus()!=Toast29.Status29.FINISH){
System.out.println("ERROR: "+t+" not Finished!");
}else{
System.out.println(t+" YAMMY! YAMMY!");
}
}
}catch(InterruptedException ie){
System.out.println("Toaster29 interrupted!");
}finally{
System.out.println("Toaster29 exit!");
}
}
}
创建这5个队列,7个线程。一起运行。他们之间就靠BlockingQueue的协调一起工作。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Exercise29{
public static void main(String[] args){
ToastQueue29 queuePB=new ToastQueue29();
ToastQueue29 queueJM=new ToastQueue29();
ToastQueue29 queuePBD=new ToastQueue29();
ToastQueue29 queueJMD=new ToastQueue29();
ToastQueue29 queueFINISH=new ToastQueue29();
ExecutorService es=Executors.newCachedThreadPool();
es.execute(new Toaster29(queuePB));
es.execute(new Toaster29(queueJM));
es.execute(new PeanutButterer29(queuePB,queuePBD));
es.execute(new Jammer29(queueJM,queueJMD));
es.execute(new PBChecker29(queuePBD,queueJM,queueFINISH));
es.execute(new JMChecker29(queueJMD,queuePB,queueFINISH));
es.execute(new Eater29(queueFINISH));
try{
TimeUnit.SECONDS.sleep(5);
}catch(InterruptedException ie){
System.out.println("Test Interrupted!");
}finally{
es.shutdownNow();
}
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Exercise30 {
private static BlockingQueue<Character> bq=new LinkedBlockingQueue<Character>();
private static Random rand=new Random();
public static class Sender implements Runnable {
public void run() {
try {
while(true){
for(char c = 'A'; c <= 'z'; c++) {
bq.put(c);
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
}
}
} catch(InterruptedException e) {
System.out.println(e + " Sender sleep interrupted");
}
}
}
public static class Receiver implements Runnable {
public void run() {
try {
while(true) {
System.out.print("Read: " + (char)bq.take() + ", ");
}
} catch(InterruptedException e) {
System.out.println(e + " Sender sleep interrupted");
}
}
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Sender());
exec.execute(new Receiver());
TimeUnit.SECONDS.sleep(4);
exec.shutdownNow();
}
}
经典的哲学家就餐问题。理论上用一个筷子筒还是有可能死锁的。因为拿筷子的动作分解为两步:拿第一根筷子,拿第二根筷子。最坏的情况每个哲学家的线程都在拿了第一根筷子的时候被挂起,结果还是每人拿了一根筷子。所以,我在拿筷子和还筷子两个动作上都加了互斥锁。这样拿两根筷子的动作有了原子性,就不可能存在所有人都拿着一根等另一根的情况。最糟糕的情况是,拿着一根筷子的哥们儿,永远吃不到。
后面一个问题,通过减少筷子,是不是可能死锁?只能说,当只有一根筷子的时候,谁都吃不成。这算不算也是死锁呢?
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class ChopstickBuck{
private final int num;
private volatile int availableNum;
public ChopstickBuck(int num){
this.num=num;
availableNum=num;
}
//只要拿两根筷子的动作是原子性的,就不会死锁
public synchronized void take() throws InterruptedException{
for(int i=0;i<2;i++){
while(availableNum==0){
wait();
}
availableNum--;
}
}
//同理
public synchronized void drop() throws InterruptedException{
availableNum++;availableNum++;
notifyAll();
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Philosopher implements Runnable {
private final int id;
private final int ponderFactor;
private Random rand = new Random();
private ChopstickBuck buck;
private void pause() throws InterruptedException {
if(ponderFactor == 0) return;
TimeUnit.MILLISECONDS.sleep(rand.nextInt(ponderFactor * 250));
}
public Philosopher(ChopstickBuck buck, int ident, int ponder) {
this.buck=buck;
id = ident;
ponderFactor = ponder;
}
public void run() {
try {
while(!Thread.interrupted()) {
System.out.println(this + " " + "thinking");
pause();
// Philosopher becomes hungry
System.out.println(this + " " + "want to eat");
buck.take();
System.out.println(this + " " + "eating");
pause();
buck.drop();
}
} catch(InterruptedException e) {
System.out.println(this + " " + "exiting via interrupt");
}
}
public String toString() { return "Philosopher " + id; }
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
public class Exercise31 {
public static void main(String[] args) throws Exception {
int ponder = 5;
if(args.length > 0){
ponder = Integer.parseInt(args[0]);
}
int size = 5;
if(args.length > 1){
size = Integer.parseInt(args[1]);
}
ExecutorService exec = Executors.newCachedThreadPool();
Chopstick[] sticks = new Chopstick[size];
ChopstickBuck buck=new ChopstickBuck(5);
for(int i = 0; i < size; i++){
exec.execute(new Philosopher(buck , i, ponder));
}
if(args.length == 3 && args[2].equals("timeout")){
TimeUnit.SECONDS.sleep(5);
}else {
System.out.println("Press ‘Enter’ to quit");
System.in.read();
}
exec.shutdownNow();
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
import java.io.*;
/**
* 主测试线程会创建所有旋转门线程和一个总的控制线程。
*/
public class Exercise32 implements Runnable{
/**
* 静态成员
*/
//探测器容器
private static List<Entrance> entrances = new ArrayList<Entrance>();
//静态计算总次数系统,因为和非静态成员CountDownLatch无关
private static volatile int count=0;
public static synchronized void sumEntrances() {
for(Entrance entrance : entrances){
count += entrance.getValue();
}
}
public static int getTotalCount() {
return count;
}
/**
* 非静态成员。关键就是初始化本次实验的探测器的总数。
*/
private final CountDownLatch latch;
private final int entranceNum;
private Runnable sumCounter;
public Exercise32(int num){
latch=new CountDownLatch(num);
entranceNum=num;
new Thread(this).start(); //构造器运行实验
}
public void run(){
System.out.println("Press Entry key to stop!");
ExecutorService execSum=Executors.newCachedThreadPool();
ExecutorService execEntrance = Executors.newCachedThreadPool();
execSum.execute(new SumCounter()); //总数统计线程
execSum.shutdown();
for(int i = 0; i < entranceNum; i++){ //每个旋转门的计数线程
execEntrance.execute(new Entrance(i));
}
try{
System.in.read();
}catch(IOException ioe){
System.out.println(ioe);
}
execEntrance.shutdownNow();
}
/**
* 不能是静态套嵌类,因为还是依赖于外部总控制器实例的latch。
*/
public class Entrance implements Runnable {
private int number = 0;
private final int id;
public Entrance(int id) {
this.id = id;
entrances.add(this);
}
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
++number;
}
System.out.println(this);
TimeUnit.MILLISECONDS.sleep(100);
}
}catch(InterruptedException e) {
System.out.println(this+"Sleep interrupted");
}finally{
System.out.println("Stopping " + this);
latch.countDown();
}
}
public synchronized int getValue() { return number; }
public String toString() {
return "Entrance " + id + ": " + getValue();
}
}
public class SumCounter implements Runnable{
public void run(){
try{
latch.await();
sumEntrances();
System.out.println("------------------------");
System.out.println("TOTAL: "+count);
}catch(InterruptedException ie){
System.out.println("Summer interrupted!");
}
}
}
public static void main(String[] args) throws Exception {
Exercise32 ex=new Exercise32(5);
}
}
所有任务都继承Delayed接口,插入DelayQueue。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Exercise33 {
/**
* 传统任务
*/
private volatile boolean light = false;
private volatile boolean water = false;
private String thermostat = "Day";
public synchronized String getThermostat() {
return thermostat;
}
public synchronized void setThermostat(String value) {
thermostat = value;
}
//delayed task
private static class DelayedTask implements Delayed{
private long trigger;
public DelayedTask(long time){trigger=time;}
public long getDelay(TimeUnit unit){
return unit.convert(trigger-System.nanoTime(),unit);
}
public int compareTo(Delayed arg){
DelayedTask dt=(DelayedTask)arg;
if(trigger<dt.trigger){return -1;}
if(trigger>dt.trigger){return 1;}
return 0;
}
public void reset(long nanoTime){
trigger=System.nanoTime()+nanoTime;
}
}
//delayed events
class LightOn extends DelayedTask implements Runnable{
public LightOn(long delay){
super(delay);
}
public void run() {
System.out.println("Turning on lights");
light = true;
}
}
class LightOff extends DelayedTask implements Runnable{
public LightOff(long delay){
super(delay);
}
public void run() {
System.out.println("Turning off lights");
light = false;
}
}
class WaterOn extends DelayedTask implements Runnable{
public WaterOn(long delay){
super(delay);
}
public void run() {
System.out.println("Turning greenhouse water on");
water = true;
}
}
class WaterOff extends DelayedTask implements Runnable{
public WaterOff(long delay){
super(delay);
}
public void run() {
System.out.println("Turning greenhouse water off");
water = false;
}
}
class ThermostatNight extends DelayedTask implements Runnable{
public ThermostatNight(long delay){
super(delay);
}
public void run() {
System.out.println("Thermostat to night setting");
setThermostat("Night");
}
}
class ThermostatDay extends DelayedTask implements Runnable{
public ThermostatDay(long delay){
super(delay);
}
public void run() {
System.out.println("Thermostat to day setting");
setThermostat("Day");
}
}
class Bell extends DelayedTask implements Runnable{
public Bell(long delay){
super(delay);
}
public void run() { System.out.println("Bing!"); }
}
class Terminate extends DelayedTask implements Runnable{
public Terminate(long delay){
super(delay);
}
public void run() {
System.out.println("Terminating");
es.shutdownNow();
System.out.println("Executor shutdown!");
new Thread() {
public void run() {
for(DataPoint d : data)
System.out.println(d);
}
}.start();
}
}
//delay Queue
private DelayQueue<DelayedTask> dq=new DelayQueue<DelayedTask>();
private ExecutorService es=Executors.newCachedThreadPool();
public ExecutorService getExecutor(){return es;}
public void setTask(DelayedTask dt){
dq.put(dt);
}
public void scheduled(){
try{
int num=1;
while(dq.size()>0){
synchronized(this){
es.execute((Runnable)dq.take());
System.out.println("Thread#"+num);
TimeUnit.SECONDS.sleep(1);
num++;
}
}
es.shutdownNow();
}catch(InterruptedException ie){
System.out.println("Executor interrupted!");
}finally{
System.out.println("Executor exit!");
}
}
public void repeat(int times){
Random rand=new Random();
try{
for(int i=1;i<=times;i++){
synchronized(this){
DelayedTask task=dq.take();
es.execute((Runnable)task);
System.out.println("Thread#"+i);
task.reset((long)(rand.nextInt(5000)));
dq.put(task);
TimeUnit.SECONDS.sleep(1);
}
}
es.shutdownNow();
}catch(InterruptedException ie){
System.out.println("Executor interrupted!");
}finally{
System.out.println("Executor exit!");
}
}
/**
* 新功能:数据收集
*/
static class DataPoint {
final Calendar time;
final float temperature;
final float humidity;
public DataPoint(Calendar d, float temp, float hum) {
time = d;
temperature = temp;
humidity = hum;
}
public String toString() {
return time.getTime() +
String.format(" temperature: %1$.1f humidity: %2$.2f",temperature, humidity);
}
}
private Calendar lastTime = Calendar.getInstance();
{ // Adjust date to the half hour
lastTime.set(Calendar.MINUTE, 30);
lastTime.set(Calendar.SECOND, 00);
}
private float lastTemp = 65.0f;
private int tempDirection = +1;
private float lastHumidity = 50.0f;
private int humidityDirection = +1;
private Random rand = new Random(47);
List<DataPoint> data = Collections.synchronizedList(new ArrayList<DataPoint>());
class CollectData extends DelayedTask implements Runnable {
public CollectData(long delay){
super(delay);
}
public void run() {
System.out.println("Collecting data");
synchronized(Exercise33.this) {
lastTime.set(Calendar.MINUTE,lastTime.get(Calendar.MINUTE) + 30);
if(rand.nextInt(5) == 4){
tempDirection = -tempDirection;
}
lastTemp = lastTemp + tempDirection * (1.0f + rand.nextFloat());
if(rand.nextInt(5) == 4){
humidityDirection = -humidityDirection;
}
lastHumidity = lastHumidity + humidityDirection * rand.nextFloat();
data.add(new DataPoint((Calendar)lastTime.clone(),lastTemp, lastHumidity));
}
}
}
public static void main(String[] args) throws Exception{
Exercise33 ex=new Exercise33();
//ex.setTask(ex.new Terminate(5000));
ex.setTask(ex.new Bell(1000));
ex.setTask(ex.new ThermostatNight(2000));
ex.setTask(ex.new LightOn(200));
ex.setTask(ex.new LightOff(400));
ex.setTask(ex.new WaterOn(600));
ex.setTask(ex.new WaterOff(800));
ex.setTask(ex.new ThermostatDay(1400));
ex.setTask(ex.new CollectData(500));
ex.scheduled();
//ex.repeat(20);
}
}
Exchanger有一个需要注意的地方:当一个线程提出交换的时候,如果它的patener线程已经提出了交换,那当前线程可以直接获得patener线程提供的对象,继续运行。这就是为什么练习34里,两个线程各持有10个盘子,但运行的结果是生产者先生产20个煎饼,然后消费者再吃20个煎饼。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Exercise34 {
class ExchangerProducer<T> implements Runnable {
private Generator<T> generator;
private Exchanger<List<T>> exchanger;
private List<T> holder;
ExchangerProducer(Exchanger<List<T>> exchg,Generator<T> gen, List<T> holder) {
exchanger = exchg;
generator = gen;
this.holder = holder;
}
public void run() {
try {
while(!Thread.interrupted()) {
for(int i = 0; i < Exercise34.size; i++)
holder.add(generator.next());
// Exchange full for empty:
holder = exchanger.exchange(holder);
}
} catch(InterruptedException e) {
System.out.println("Producer Thread interrupted!");
} finally{
System.out.println("Producer Thread exit!");
}
}
}
class ExchangerConsumer<T> implements Runnable {
private Exchanger<List<T>> exchanger;
private List<T> holder;
private volatile T value;
ExchangerConsumer(Exchanger<List<T>> ex, List<T> holder){
exchanger = ex;
this.holder = holder;
}
public void run() {
try {
while(!Thread.interrupted()) {
holder = exchanger.exchange(holder);
for(T x : holder) {
value = x; // Fetch out value
holder.remove(x); // OK for CopyOnWriteArrayList
System.out.println(x+" eaten!");
}
}
} catch(InterruptedException e) {
System.out.println("Consumer Thread interrupted!");
} finally{
System.out.println("Consumer Thread exit!");
System.out.println("Final value: " + value);
}
}
}
static int count=0;
class Pancake{
private int id;
public Pancake(){
id=++count;
System.out.println(this+" cooked!");
}
public String toString(){return "Pancake#"+id;}
}
interface Generator<T>{public T next();}
class Oven implements Generator<Pancake>{
public Pancake next(){
return new Pancake();
}
}
static int size = 10;
static int delay = 5; // Seconds
public static void main(String[] args) throws Exception {
if(args.length > 0){
size = new Integer(args[0]);
}
if(args.length > 1){
delay = new Integer(args[1]);
}
Exercise34 ex=new Exercise34();
ExecutorService exec = Executors.newCachedThreadPool();
Exchanger<List<Pancake>> xc = new Exchanger<List<Pancake>>();
List<Pancake>
producerPlate = new CopyOnWriteArrayList<Pancake>(),
consumerPlate = new CopyOnWriteArrayList<Pancake>();
exec.execute(ex.new ExchangerProducer<Pancake>(xc,ex.new Oven(), producerPlate));
exec.execute(ex.new ExchangerConsumer<Pancake>(xc,consumerPlate));
TimeUnit.SECONDS.sleep(delay);
exec.shutdownNow();
}
}
WebClient只要连接申请队列没有满,就一直加快创建新申请的频率。最后稳定下来的频率就是对方WebServer的最大连接申请吞吐量。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class WebClient{
private static Random rand = new Random();
//web请求
static class Request {
private final int serviceTime;
public Request(int tm) { serviceTime = tm; }
public int getServiceTime() { return serviceTime; }
public String toString() {
return "[" + serviceTime + "]";
}
}
//web请求队列
static class RequestLine extends ArrayBlockingQueue<Request> {
private int maxSize;
public RequestLine(int maxLineSize) {
super(maxLineSize);
maxSize=maxLineSize;
}
public int getMax(){return maxSize;}
public String toString() {
if(this.size() == 0){
return "[Empty]";
}
StringBuilder result = new StringBuilder();
for(Request r : this){
result.append(r);
}
return result.toString();
}
}
//web请求生成器,自动根据服务器吞吐量调节生成频率。
static class RequestGenerator implements Runnable {
private int period=100; //初始频率,每秒一个请求
private RequestLine requests;
public RequestGenerator(RequestLine rq) {
requests = rq;
}
public void run() {
try {
while(!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(period));
try{
requests.add(new Request(rand.nextInt(1000)));
if(period>1){
period-=1;
}
}catch(IllegalStateException ise){ //队列满了
period+=1;
}
}
} catch(InterruptedException e) {
System.out.println("CustomerGenerator interrupted");
}
System.out.println("CustomerGenerator terminating");
}
public String showBalance(){
StringBuilder sb=new StringBuilder();
float freq=1000/(float)period;
String str=String.format("%.2f",freq);
for(int i=0;i<(int)(freq);i++){
sb=sb.append("+");
}
return sb.append(" "+str).toString();
}
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class WebServer{
private static int handlerCounter = 0;
//银行出纳,web请求处理器
static class RequestHandler implements Runnable, Comparable<RequestHandler> {
private final int id = handlerCounter++;
private int requestServed = 0;
private WebClient.RequestLine requests;
private boolean servingRequestLine = true;
public RequestHandler(WebClient.RequestLine rq) { requests = rq; }
public void run() {
try {
while(!Thread.interrupted()) {
WebClient.Request r = requests.take();
TimeUnit.MILLISECONDS.sleep(r.getServiceTime());
synchronized(this) {
requestServed++;
while(!servingRequestLine){
wait();
}
}
}
} catch(InterruptedException e) {
System.out.println(this + "interrupted");
}
System.out.println(this + "terminating");
}
public synchronized void hangs() {
requestServed = 0;
servingRequestLine = false;
}
public synchronized void serveRequestLine() {
assert !servingRequestLine:"already serving: " + this;
servingRequestLine = true;
notifyAll();
}
public String toString() { return "Request Handler " + id + " "; }
public String shortString() { return "H" + id; }
// Used by priority queue:
public synchronized int compareTo(RequestHandler other) {
return requestServed < other.requestServed ? -1 : (requestServed == other.requestServed ? 0 : 1);
}
}
//银行经理控制出纳数量,服务器大脑负载调节器
static class Server implements Runnable {
private ExecutorService exec;
private WebClient.RequestLine requests;
private PriorityQueue<RequestHandler> workingHandlers = new PriorityQueue<RequestHandler>();
private Queue<RequestHandler> handlersDoingOtherThings = new LinkedList<RequestHandler>();
private int adjustmentPeriod;
private int maxHandler;
public Server(ExecutorService e, WebClient.RequestLine requests, int adjustmentPeriod, int maxHandler) {
exec = e;
this.requests = requests;
this.adjustmentPeriod = adjustmentPeriod;
this.maxHandler=maxHandler;
// Start with a single handler:
RequestHandler handler= new RequestHandler(requests);
exec.execute(handler);
workingHandlers.add(handler);
}
public int getHandlerNumber(){return workingHandlers.size()+handlersDoingOtherThings.size();}
public void adjustHandlerNumber() {
// This is actually a control system. By adjusting
// the numbers, you can reveal stability issues in
// the control mechanism.
// If line is too long, add another handler:
if(requests.size() / (float)requests.getMax() > 0.5f) {
// If handlers are on break or doing
// another job, bring one back:
if(handlersDoingOtherThings.size() > 0) {
RequestHandler handler = handlersDoingOtherThings.remove();
handler.serveRequestLine();
workingHandlers.offer(handler);
return;
}
// Else start a new request handler
if(workingHandlers.size()<maxHandler){
RequestHandler handler = new RequestHandler(requests);
exec.execute(handler);
workingHandlers.add(handler);
return;
}
}
// If line is short enough, remove a handler:
if(workingHandlers.size() > 1 && requests.size() / (float)requests.getMax() < 0.5f){
reassignOneHandler();
}
// If there is no line, we only need one handler:
if(requests.size() == 0){
while(workingHandlers.size() > 1){
reassignOneHandler();
}
}
}
// Give a request handler a different job or a break:
private void reassignOneHandler() {
RequestHandler handler = workingHandlers.poll();
handler.hangs();
handlersDoingOtherThings.offer(handler);
}
public void run() {
try {
while(!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(adjustmentPeriod);
adjustHandlerNumber();
/**
System.out.print(requests + " { ");
for(RequestHandler handler : workingHandlers)
System.out.print(handler.shortString() + " ");
System.out.println("}");
*/
}
} catch(InterruptedException e) {
System.out.println(this + "interrupted");
}
System.out.println(this + "terminating");
}
public String toString() { return "["+workingHandlers.size()+"/"+handlersDoingOtherThings.size()+"/"+maxHandler+"]"; }
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Exercise35 {
//准备运行测试
static final int MAX_LINE_SIZE = 100;
static final int ADJUSTMENT_PERIOD = 100;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
// If line is too long, customers will leave:
WebClient.RequestLine requests = new WebClient.RequestLine(MAX_LINE_SIZE);
WebClient.RequestGenerator gen=new WebClient.RequestGenerator(requests);
exec.execute(gen);
// Manager will add and remove tellers as necessary:
WebServer.Server server=new WebServer.Server(exec, requests, ADJUSTMENT_PERIOD, 30);
exec.execute(server);
// print the result
exec.execute(new Runnable(){
public void run(){
try{
while(!Thread.interrupted()){
TimeUnit.MILLISECONDS.sleep(20);
System.out.println(gen.showBalance()+" ["+requests.size()+"/"+MAX_LINE_SIZE+"]"+server);
}
}catch(InterruptedException ie){
System.out.println("Print interrupted!");
}finally{
System.out.println("Printer exit!");
}
}
});
if(args.length > 0){ // Optional argument
TimeUnit.SECONDS.sleep(new Integer(args[0]));
}else {
System.out.println("Press ‘Enter’ to quit");
System.in.read();
}
exec.shutdownNow();
}
}
十分题。做的时候,场景又更加被复杂化了一点。变成一道15分题。
先说一下做这道题时体会到的几条设计原则:
餐馆的整体大致流程如下:
线程:
模块拆分(这次运用了层层增加功能的扩展法):
所有的客人,服务员,厨师线程,通过TableQueue,ServiceQueue,OrderQueue,PlateQueue,SpaceQueue几个同步带锁队列同步互动。不需要再加独占锁。
检查会不会有死锁的情况。比如服务员在监听客人排队点餐,和监听上菜队列时,如果阻塞,有可能导致死锁。因为如果点菜时阻塞:当没新客人的时候服务员死等,会死锁,永远不上菜。如果监听上菜时死锁:新客人都在排队,厨师就没菜做,服务员等厨师就是死锁。
所以最后,服务员在ServiceQueue和PlateQueue两个队列上只是轮询忙等,不阻塞。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.*;
public interface Food36 { //两层枚举的第一层
public float getPrice();
enum Appetizer implements Food36 {
SALAD(15.5f), SOUP(10.8f), SPRING_ROLLS(8.8f);
private float price=0.0f;
private Appetizer(float p){price=p;}
public String toString(){return EnumPrinter.getText(this)+"("+price+")";}
public float getPrice(){return price;}
}
enum MainCourse implements Food36 {
LASAGNE(28.5f), BURRITO(35.9f), PAD_THAI(16.9f),
LENTILS(21.0f), HUMMOUS(12.5f), VINDALOO(39.9f);
private float price=0.0f;
private MainCourse(float p){price=p;}
public String toString(){return EnumPrinter.getText(this)+"("+price+")";}
public float getPrice(){return price;}
}
enum Dessert implements Food36 {
TIRAMISU(9.9f), GELATO(5.6f), BLACK_FOREST_CAKE(8.7f),
FRUIT(15.5f), CREME_CARAMEL(5.5f);
private float price=0.0f;
private Dessert(float p){price=p;}
public String toString(){return EnumPrinter.getText(this)+"("+price+")";}
public float getPrice(){return price;}
}
enum Coffee implements Food36 {
BLACK_COFFEE(6.5f), DECAF_COFFEE(4.3f), ESPRESSO(5.0f),
LATTE(3.1f), CAPPUCCINO(4.3f), TEA(2.8f), HERB_TEA(3.0f);
private float price=0.0f;
private Coffee(float p){price=p;}
public String toString(){return EnumPrinter.getText(this)+"("+price+")";}
public float getPrice(){return price;}
}
public class EnumPrinter{ //依赖注入
public static String getText(Enum e){return "["+e.getClass().getSimpleName()+"-"+e.name()+"]";}
}
public static void main(String[] args){
System.out.println(Appetizer.SALAD);
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.*;
public enum Course36 {
/**
* 枚举工厂:两层枚举的第二层
*/
APPETIZER(Food36.Appetizer.class),
MAINCOURSE(Food36.MainCourse.class),
DESSERT(Food36.Dessert.class),
COFFEE(Food36.Coffee.class);
/**
* 抽象构造
*/
private static final Random rand=new Random();
private Food36[] values;
private Course36(Class<? extends Food36> kind) {
values = kind.getEnumConstants();
}
public Food36 randomFood() {
return values[rand.nextInt(values.length)];
}
/**
* 对象管理
*/
public static Course36 randomType(){
return values()[rand.nextInt(values().length)];
}
public static Food36 random(){
return randomType().randomFood();
}
public static Food36 randomAppet(){
return APPETIZER.randomFood();
}
public static Food36 randomMain(){
return MAINCOURSE.randomFood();
}
public static Food36 randomDessert(){
return DESSERT.randomFood();
}
public static Food36 randomCoffee(){
return COFFEE.randomFood();
}
public static void main(String[] args){
Food36 food=Course36.random();
System.out.println(food);
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.*;
public class Order36 extends Thing36{
private static int count=0;
private final int id=++count;
private final String name="Order";
public String toString(){return name+" #"+id+" "+food;}
private final Food36 food;
private final OrderTicket36 ticket;
public Order36(Food36 f, OrderTicket36 t){
food=f;
ticket=t;
}
public Food36 getFood(){return food;}
public OrderTicket36 getTicket(){return ticket;}
public float getPrice(){return food.getPrice();}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class OrderTicket36 {
private static final Random rand=new Random();
private static int count=0;
private final int id=++count;
private final String name="Order ticket";
public String toString(){return name+" #"+id;}
private List<Order36> orders=new ArrayList<Order36>();
private final Table36 table;
private final Client36 client;
private final WaitPerson36 waiter;
private float totalPrice=0.0f;
public OrderTicket36(Table36 t, Client36 c, WaitPerson36 w){
table=t;
client=c;
waiter=w;
}
public OrderTicket36(Client36 c, WaitPerson36 w){
table=c.getTable();
client=c;
waiter=w;
}
public Table36 getTable(){return table;}
public Client36 getClient(){return client;}
public float getTotalPrice(){return totalPrice;}
public WaitPerson36 getWaiter(){return waiter;}
public void addOrder(Order36 order){
orders.add(order);
totalPrice+=order.getPrice();
}
public List<Order36> getOrders(){return orders;}
public String readTicket(){
StringBuilder sb=new StringBuilder();
sb.append(client+" wants to eat: ");
for(Order36 order:orders){
sb.append(order.toString()+"; ");
}
return sb.toString();
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Table36 {
private static int count=0;
private final int id=++count;
private final String name="Table";
public String toString(){return name+" #"+id;}
private final int MAX_CLIENT;
private volatile int available;
private BlockingQueue<Order36> orderTicket;
private Restaurant36 restaurant; //反向依赖注入
public Table36(int size,Restaurant36 restaurant){
MAX_CLIENT=size;
available=size;
orderTicket=new LinkedBlockingQueue<Order36>();
this.restaurant=restaurant;
System.out.println(this+" created! "+left()+" seats in total!");
}
public boolean available(){return available>0;}
public int left(){return available;}
public synchronized void oneSit(Client36 client){
if(!available()){System.out.println("Error, "+this+" not available for "+client);return;}
--available;
if(available()){
restaurant.moreTable(this);
}
}
public synchronized void oneLeave(Client36 client){
if(available>=MAX_CLIENT){System.out.println("Error, no client on "+this+"!");return;}
++available;
if(available==1){
restaurant.moreTable(this);
}
}
public static void main(String[] args){
Table36 table=new Table36(3,new Restaurant36(5,5));
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.*;
public class Plate36{
private static int count=0;
private final int id=++count;
private final String name="Plate";
public String toString(){return name+" #"+id+": "+chef+"->"+order;}
private final Order36 order;
private final Chef36 chef;
private final Table36 table;
public Plate36(Order36 o, Chef36 c, Table36 t){
order=o;
chef=c;
table=t;
}
public Order36 getOrder(){return order;}
public Chef36 getChef(){return chef;}
public Table36 getTable(){return table;}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Client36 implements Runnable{
private static int count=0;
private final int id=++count;
private final String name="Client";
public String toString(){return name+" #"+id;}
private Restaurant36 restaurant; //反向依赖注入
private Table36 table=null; //反向依赖注入
private boolean foundTable=false;
private BlockingQueue<Client36> serviceQueue;
private SynchronousQueue<Plate36> hisSpace; //普通组合。每个客人只有一个餐位,菜要一道一道吃。
private OrderTicket36 ticket;
private int finished;
public Client36(Restaurant36 r){
restaurant=r;
serviceQueue=r.getServiceQueue();
hisSpace=new SynchronousQueue<Plate36>();
finished=0;
}
public synchronized void lookForTable(){
try{
table=restaurant.getTables().take(); //访问同步阻塞队列
sit(table);
}catch(InterruptedException ie){
System.out.println(this+" is interrupted in the queue of table!");
}
}
public boolean foundTable(){return table!=null;}
public Table36 getTable(){return table;}
public BlockingQueue<Plate36> getSpace(){return hisSpace;}
public void keepTicket(OrderTicket36 t){ticket=t;}
public OrderTicket36 getTicket(){return ticket;}
public boolean checkOrder(Order36 o){
if(ticket==null){return false;}
return ticket.getOrders().contains(o);
}
public synchronized void sit(Table36 table){
table.oneSit(this);
System.out.println(this+" sit on "+table+". "+"["+table.left()+" left]");
}
public synchronized void leave(Table36 table){
table.oneLeave(this);
System.out.println(this+" leave "+table+". ["+table.left()+" left]");
}
public boolean finishEat(){return finished==ticket.getOrders().size();}
public void pay(){
if(ticket==null){System.out.println("Sorry, I didn't eat anything!");return;}
if(!finishEat()){System.out.println("Sorry, I didn't finish the dinner!");return;}
System.out.println(this+" Pay "+ticket.getTotalPrice()+". Bye-Bye!");
}
public void run(){
try{
//look for table
lookForTable();
//wait in serviceQueue
serviceQueue.put(this);
//等点餐小票
while(ticket==null){
Thread.yield();
TimeUnit.MILLISECONDS.sleep(1);
}
//等着上菜
while(!finishEat()){
Plate36 plate=hisSpace.take(); //阻塞
if(checkOrder(plate.getOrder())){
System.out.println(this+" is eating "+plate+"...");
finished++;
TimeUnit.MILLISECONDS.sleep(1);
}
}
//吃完再付账
pay();
}catch(InterruptedException ie){
System.out.println(this+" Eat interrupted!");
}finally{
//leave table。不管吃没吃完都要走。
if(foundTable()){
leave(table);
}
}
}
public static void main(String[] args){
System.out.println(Course36.random());
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class WaitPerson36 implements Runnable{
private static final Random rand=new Random();
private static int count=0;
private final int id=++count;
private final String name="Wait Person";
public String toString(){return name+" #"+id;}
private Restaurant36 restaurant; //反向依赖注入
private BlockingQueue<Client36> serviceQueue; //反向依赖注入
private BlockingQueue<OrderTicket36> orderQueue; //反向依赖注入
private BlockingQueue<Plate36> plateQueue; //反向依赖注入
public WaitPerson36(Restaurant36 r){
restaurant=r;
serviceQueue=restaurant.getServiceQueue();
orderQueue=restaurant.getOrderQueue();
plateQueue=restaurant.getPlateQueue();
}
public void run(){
try{
while(!Thread.interrupted()){
//点菜
Client36 client=serviceQueue.poll(1,TimeUnit.MILLISECONDS); //不阻塞,等1微秒(否则没新客人的时候服务员死等,会死锁,永远不上菜。)
if(client!=null){
OrderTicket36 ticket=new OrderTicket36(client,this);
ticket.addOrder(new Order36(Course36.randomAppet(),ticket));
ticket.addOrder(new Order36(Course36.randomMain(),ticket));
ticket.addOrder(new Order36(Course36.randomDessert(),ticket));
ticket.addOrder(new Order36(Course36.randomCoffee(),ticket));
System.out.println(ticket.readTicket());
orderQueue.put(ticket); //无界队列,不阻塞
client.keepTicket(ticket);
}
TimeUnit.MILLISECONDS.sleep(1);
//上菜
Plate36 plate=plateQueue.poll(1,TimeUnit.MILLISECONDS); //不阻塞,等1微秒(否则,新客人都在排队,厨师就没菜做,服务员等厨师就是死锁。)
if(plate!=null){
Client36 client2=plate.getOrder().getTicket().getClient();
client2.getSpace().put(plate); //阻塞。不会死锁。
}
}
}catch(InterruptedException ie){
System.out.println(this+" Service Interrupted!");
}
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Chef36 implements Runnable{
private static int count=0;
private final int id=++count;
private final String name="Chef";
public String toString(){return name+" #"+id;}
private Restaurant36 restaurant;
private BlockingQueue<OrderTicket36> orders;
private BlockingQueue<Plate36> plates;
public Chef36(Restaurant36 r){
restaurant=r;
orders=restaurant.getOrderQueue();
plates=restaurant.getPlateQueue();
}
public void run(){
try{
while(!Thread.interrupted()){
OrderTicket36 ticket=orders.take(); //阻塞
List<Order36> list=ticket.getOrders();
for(Order36 order:list){
Plate36 plate=new Plate36(order,this,ticket.getTable());
plates.put(plate); //无界队列,不阻塞
TimeUnit.MILLISECONDS.sleep(1);
System.out.println(plate+" for "+ticket.getClient()+"at "+ticket.getTable()+" prepared!");
}
}
}catch(InterruptedException ie){
System.out.println(this+" interrupted!");
}
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Restaurant36 implements Runnable{
private static final Random rand=new Random();
private static int count=0;
private final int id=++count;
private final String name="Restaurant";
public String toString(){return name+" #"+id;}
private int tableNum;
private int tableSize;
private BlockingQueue<Table36> tables;
private BlockingQueue<Client36> serviceQueue;
private BlockingQueue<OrderTicket36> orderQueue;
private BlockingQueue<Plate36> plateQueue;
public Restaurant36(int tableNum, int tableSize){
this.tableNum=tableNum;
this.tableSize=tableSize;
tables=new ArrayBlockingQueue<Table36>(tableNum);
serviceQueue=new LinkedBlockingQueue<Client36>();
orderQueue=new LinkedBlockingQueue<OrderTicket36>();
plateQueue=new LinkedBlockingQueue<Plate36>();
}
public BlockingQueue<Table36> getTables(){return tables;}
public BlockingQueue<Client36> getServiceQueue(){return serviceQueue;}
public BlockingQueue<OrderTicket36> getOrderQueue(){return orderQueue;}
public BlockingQueue<Plate36> getPlateQueue(){return plateQueue;}
public void moreTable(Table36 table){
try{
tables.put(table); //访问同步阻塞队列
}catch(InterruptedException ie){
System.out.println("Table Insertion interrupted!");
}
}
public void run(){
for(int i=0;i<tableNum;i++){
moreTable(new Table36(rand.nextInt(tableSize)+1,this));
if(Thread.currentThread().interrupted()){
System.out.println("Restaurant initialization interrupted!");
break;
}
}
}
}
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
import java.io.*;
public class Exercise36{
public static void main(String[] args){
System.out.println("Press enter to stop!");
//executor
ExecutorService exec=Executors.newCachedThreadPool();
//restaurant
Restaurant36 restaurant=new Restaurant36(5,10);
exec.execute(restaurant);
//client
try{
//Wait Person
for(int i=0;i<10;i++){
exec.execute(new WaitPerson36(restaurant));
}
for(int i=0;i<5;i++){
exec.execute(new Chef36(restaurant));
}
for(int i=0;i<100;i++){
exec.execute(new Client36(restaurant));
TimeUnit.MILLISECONDS.sleep(1);
}
System.in.read();
}catch(IOException ioe){
System.out.println("Error in Standard input!");
}catch(InterruptedException ie){
System.out.println("Client creation interrupted!");
}
exec.shutdownNow();
}
}
测试了一个餐馆,10个服务员,5个厨师,100个客人,5张桌子,每张桌子1-10个位置。输出的结果就像下面这样:
Press enter to stop!
Table #1 created! 8 seats in total!
Table #2 created! 2 seats in total!
Table #3 created! 1 seats in total!
Table #4 created! 2 seats in total!
Table #5 created! 10 seats in total!
Client #1 sit on Table #1. [7 left]
Client #2 sit on Table #2. [1 left]
Client #3 sit on Table #3. [0 left]
Client #4 sit on Table #4. [1 left]
Client #5 sit on Table #5. [9 left]
Client #6 sit on Table #1. [6 left]
Client #7 sit on Table #2. [0 left]
Client #1 wants to eat: Order #1 [Appetizer-SPRING_ROLLS](8.8); Order #4 [MainCourse-HUMMOUS](12.5); Order #8 [Dessert-GELATO](5.6); Order #12 [Coffee-HERB_TEA](3.0);
Client #5 wants to eat: Order #2 [Appetizer-SALAD](15.5); Order #5 [MainCourse-LENTILS](21.0); Order #10 [Dessert-TIRAMISU](9.9); Order #13 [Coffee-BLACK_COFFEE](6.5);
Client #6 wants to eat: Order #21 [Appetizer-SOUP](10.8); Order #22 [MainCourse-HUMMOUS](12.5); Order #23 [Dessert-GELATO](5.6); Order #24 [Coffee-LATTE](3.1);
Client #7 wants to eat: Order #25 [Appetizer-SPRING_ROLLS](8.8); Order #26 [MainCourse-PAD_THAI](16.9); Order #27 [Dessert-BLACK_FOREST_CAKE](8.7); Order #28 [Coffee-LATTE](3.1);
Client #3 wants to eat: Order #17 [Appetizer-SOUP](10.8); Order #18 [MainCourse-VINDALOO](39.9); Order #19 [Dessert-TIRAMISU](9.9); Order #20 [Coffee-DECAF_COFFEE](4.3);
Client #4 wants to eat: Order #3 [Appetizer-SALAD](15.5); Order #6 [MainCourse-PAD_THAI](16.9); Order #9 [Dessert-FRUIT](15.5); Order #14 [Coffee-BLACK_COFFEE](6.5);
Client #2 wants to eat: Order #7 [Appetizer-SOUP](10.8); Order #11 [MainCourse-PAD_THAI](16.9); Order #15 [Dessert-FRUIT](15.5); Order #16 [Coffee-TEA](2.8);
Client #8 sit on Table #4. [0 left]
Client #1 is eating Plate #1: Chef #1->Order #1 [Appetizer-SPRING_ROLLS](8.8)...
Client #8 wants to eat: Order #29 [Appetizer-SALAD](15.5); Order #30 [MainCourse-LENTILS](21.0); Order #31 [Dessert-BLACK_FOREST_CAKE](8.7); Order #32 [Coffee-BLACK_COFFEE](6.5);
Client #5 is eating Plate #2: Chef #2->Order #2 [Appetizer-SALAD](15.5)...
Client #6 is eating Plate #3: Chef #3->Order #21 [Appetizer-SOUP](10.8)...
Client #7 is eating Plate #4: Chef #4->Order #25 [Appetizer-SPRING_ROLLS](8.8)...
Client #3 is eating Plate #5: Chef #5->Order #17 [Appetizer-SOUP](10.8)...
Plate #1: Chef #1->Order #1 [Appetizer-SPRING_ROLLS](8.8) for Client #1at Table #1 prepared!
Client #9 sit on Table #5. [8 left]
... ...
... ...
这里需要注意CyclicBarrier栅栏是必须的。不然会出现两种问题:第一,hire()只是把某个Robot绑定某个Assembler,并激活这个robot。但robot实际操作完成还需要一段时间。如果有robot没有完成工作,但Assembler已经开始装载下一辆汽车,那这个robot装配的部件会装到下一辆车上,因为robot是绑定Assembler的。第二,只要Assembler完成激活所有robot,就算部分robot的工作还没完成,半成品汽车也会被送入finishQueue。最后Reporter打印的就是未完成品。
所以这个汽车装配的场景必须保证所有机器人都成功装上他负责的部件,才能把成品送入成品仓库。然后组装器才能再加载新的毛坯车。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
import java.io.*;
import java.lang.reflect.*;
public class Exercise37{
/**
* Car
*/
private int carId=0;
public class Car{
private int id;
private boolean engine=false, drivenTrain=false, wheels=false, exhaust=false, body=false, fender=false;
public Car(int num){id=num;}
public Car(){id=++carId;}
public int getId(){return id;}
public void addEngine(){engine=true;}
public void addDriveTrain(){drivenTrain=true;}
public void addWheels(){wheels=true;}
public void addExhaust(){exhaust=true;}
public void addBody(){body=true;}
public void addFender(){fender=true;}
public String toString(){
return "[Car #"+id+(engine? " Engine":"")+(drivenTrain? " DrivenTrain":"")+(wheels? " Wheels":"")+(exhaust? " Exhaust":"")+(body? " Body":"")+(fender? " Fender":"")+"]";
}
}
/**
* ChassisBuilder
*/
public class ChassisBuilder implements Runnable{
//carqueue
private CarQueue carQueue;
public ChassisBuilder(CarQueue queue){
carQueue=queue;
}
//Thread
public void run(){
try{
while(!Thread.interrupted()){
Car car=new Car();
System.out.println(car+" created!");
carQueue.put(car); //blocked
System.out.println(car+" into the car queue.");
TimeUnit.MILLISECONDS.sleep(1000);
}
System.out.println("CharssisBuilder exit correctly!");
}catch(InterruptedException ie){
System.out.println("CharssisBuilder is interrupted!");
}
}
}
/**
* CarQueue
*/
public class CarQueue extends LinkedBlockingQueue<Car>{
private static final long serialVersionUID=0l;
}
/**
* Robot
*/
private int robotId=0;
public abstract class Robot implements Runnable{
protected int id=++robotId;
private RobotPool pool;
public Robot(RobotPool p){pool=p;}
public String toString(){return "Robot #"+id+": "+getClass().getSimpleName();}
public synchronized void powerDown() throws InterruptedException{ //ready to work
engage=false;
assembler=null;
pool.release(this);
System.out.println(this+" is ready to work!");
while(engage==false){
wait();
}
}
protected Assembler assembler;
public Robot assignAssembler(Assembler a){
assembler=a;
System.out.println(this+" binding with "+a);
return this;
}
protected boolean engage=false;
public synchronized void engage(){engage=true;notifyAll();} //finish powerdown
public abstract void performService(); //work
public void run(){
try{
while(!Thread.interrupted()){
powerDown();
performService();
assembler.getBarrier().await();
}
}catch(InterruptedException ie){
System.out.println(this+" interrupted!");
}catch(BrokenBarrierException bbe){
System.out.println(this+" barrier broken!");
}
System.out.println(this+" exit!");
}
}
public class EngineRobot extends Robot{
public EngineRobot(RobotPool r){super(r);}
public void performService(){
System.out.println(this+" is installing engine for "+assembler.getCar());
assembler.getCar().addEngine();
}
}
public class DriveTrainRobot extends Robot{
public DriveTrainRobot(RobotPool r){super(r);}
public void performService(){
System.out.println(this+" is installing Drive Train for "+assembler.getCar());
assembler.getCar().addDriveTrain();
}
}
public class WheelRobot extends Robot{
public WheelRobot(RobotPool r){super(r);}
public void performService(){
System.out.println(this+" is installing wheel for "+assembler.getCar());
assembler.getCar().addWheels();
}
}
public class ExhaustRobot extends Robot{
public ExhaustRobot(RobotPool r){super(r);}
public void performService(){
System.out.println(this+" is installing exhaust system for "+assembler.getCar());
assembler.getCar().addExhaust();
}
}
public class BodyRobot extends Robot{
public BodyRobot(RobotPool r){super(r);}
public void performService(){
System.out.println(this+" is installing body of car for "+assembler.getCar());
assembler.getCar().addBody();
}
}
public class FenderRobot extends Robot{
public FenderRobot(RobotPool r){super(r);}
public void performService(){
System.out.println(this+" is installing fenders for "+assembler.getCar());
assembler.getCar().addFender();
}
}
/**
* Assembler
*/
private int assemblerId=0;
public class Assembler implements Runnable{
private int id=++assemblerId;
private Car car;
private CyclicBarrier barrier=new CyclicBarrier(7);
public CyclicBarrier getBarrier(){return barrier;}
private RobotPool pool;
private CarQueue carQueue, finishQueue;
public Assembler(RobotPool p, CarQueue c, CarQueue f){
pool=p;
carQueue=c;
finishQueue=f;
}
public String toString(){return "Assembler #"+id;}
public Car getCar(){return car;}
public void run(){
try{
while(!Thread.interrupted()){
car=carQueue.take(); //block
pool.hire(ExhaustRobot.class, this); //block
pool.hire(BodyRobot.class,this); //block
pool.hire(FenderRobot.class,this); //block
pool.hire(EngineRobot.class,this); //block
pool.hire(DriveTrainRobot.class,this); //block
pool.hire(WheelRobot.class,this); //block
barrier.await();
finishQueue.put(car); //block
}
}catch(InterruptedException ie){
System.out.println(this+" interrupted!");
}catch(BrokenBarrierException bbe){
System.out.println("Barrier brocken!");
}
System.out.println(this+" exit!");
}
}
/**
* RobotPool
*/
public class RobotPool{
private Set<Robot> pool=new HashSet<Robot>();
public synchronized void hire(Class<? extends Robot> robotType,Assembler a ) throws InterruptedException { //allow to interrupt the assembler
for(Robot r:pool){
if(r.getClass().equals(robotType)){
pool.remove(r);
r.assignAssembler(a);
r.engage();
return;
}
}
wait();
hire(robotType,a);
}
public void release(Robot r){pool.add(r);}
}
/**
* Finally
*/
public class Reporter implements Runnable{
private CarQueue cars;
public Reporter(CarQueue queue){cars=queue;}
public void run(){
try{
while(!Thread.interrupted()){
Car car=cars.take();
System.out.println(car+" finished!");
}
}catch(InterruptedException ie){
System.out.println("Reporter interrupted!");
}
System.out.println("Reporter exit!");
}
}
/**
* MAIN
* @param args
*/
public static void main(String[] args){
System.out.println("Type Entry to stop!");
Exercise37 ex=new Exercise37();
ExecutorService exec=Executors.newCachedThreadPool();
RobotPool pool=ex.new RobotPool();
CarQueue carQueue=ex.new CarQueue();
CarQueue finishQueue=ex.new CarQueue();
for(int i=0;i<5;i++){
exec.execute(ex.new Assembler(pool,carQueue,finishQueue));
}
exec.execute(ex.new EngineRobot(pool));
exec.execute(ex.new DriveTrainRobot(pool));
exec.execute(ex.new WheelRobot(pool));
exec.execute(ex.new ExhaustRobot(pool));
exec.execute(ex.new BodyRobot(pool));
exec.execute(ex.new FenderRobot(pool));
exec.execute(ex.new ChassisBuilder(carQueue));
exec.execute(ex.new Reporter(finishQueue));
try{
System.in.read();
}catch(IOException ioe){
System.out.println("Test interrupted!");
}
exec.shutdownNow();
}
}
和第37题生产汽车不一样的一点是,生产汽车先装车窗还是先装轮子,顺序不重要。但造房子必须先打地基,再架钢结构,再布管线,再浇灌混凝土,最后装饰,一切按步骤进行。所以这题在hire()函数中绑定包工队后,项目经理会到包工队的锁上等待完工。包工队完工后会notifyAll()和经理握手,然后经理再找下一环节的包工队。所以经理和每个包工队两次握手,第一次握手 – 经理把专项子工程交给包工队,第二次握手 – 包工队完工交付任务。
这题另一个收获是自发体会到了“生产者-消费者”模型要互相跑到对方的锁上叫醒对方,确实是比较合理的一种构型,可以当做一种惯用法。
/**
* Exercise 38 house building with concurrent
*/
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
import java.io.*;
public class Exercise38{
/**
* House
*/
public class House{
private String owner;
private Draw draw;
private boolean designed=false, base=false, structure=false, pipeline=false, wall=false, decoration=false;
public House(String name){
owner=name;
}
public String toString(){
StringBuilder sb=new StringBuilder();
sb.append("["+owner);
if(designed){sb.append(" Designed!");}
if(base){sb.append(" Based!");}
if(structure){sb.append(" Structured!");}
if(pipeline){sb.append(" Pipelined!");}
if(wall){sb.append(" Walled!");}
if(decoration){sb.append(" Decorated!");}
return sb.append("]").toString();
}
//all atomic
public synchronized void bindingDraw(Draw d){
draw=d;
designed=true;
}
public synchronized void addBase(){base=true;}
public synchronized void addStructure(){structure=true;}
public synchronized void addPipeline(){pipeline=true;}
public synchronized void addWall(){wall=true;}
public synchronized void addDecoration(){decoration=true;}
public synchronized boolean getBase(){return base;}
public synchronized boolean getStructure(){return structure;}
public synchronized boolean getPipeline(){return pipeline;}
public synchronized boolean getWall(){return wall;}
public synchronized boolean getDecoration(){return decoration;}
}
public class Market implements Runnable{
private HouseQueue orders;
public Market(HouseQueue q){orders=q;}
public void run(){
try{
for(int i=0;i<10;i++){
House house=new House(getName(10));
System.out.println("New house Order: "+house);
orders.put(house);
TimeUnit.MILLISECONDS.sleep(100);
}
}catch(InterruptedException ie){
System.out.println("Test interrupted!");
}
System.out.println("Test exit!");
}
}
/**
* Two main queues
*/
public class HouseQueue extends LinkedBlockingQueue<House>{
private static final long serialVersionUID=0;
}
/**
* Designer
*/
public class Draw{
private House house;
private Designer designer;
public Draw(House h,Designer d){house=h;designer=d;}
public String toString(){return house+" designed by "+designer;}
public Designer getDesigner(){return designer;}
public House getHouse(){return house;}
}
public class Designer implements Runnable{
private String name;
private HouseQueue orders;
private HouseQueue creations;
public Designer(String n,HouseQueue o,HouseQueue c){name=n;orders=o;creations=c;}
public String toString(){return "Designer "+name;}
public String getName(){return name;}
public synchronized void draw(House h) throws InterruptedException{
TimeUnit.MILLISECONDS.sleep(100);
h.bindingDraw(new Draw(h,this));
}
public void run(){
try{
while(!Thread.interrupted()){
House h=orders.take(); //block
System.out.println(this+" get new order: "+h);
draw(h);
System.out.println(h+" designed by "+this);
creations.put(h); //block
}
}catch(InterruptedException ie){
System.out.println("Designer interrupted!");
}
System.out.println("Designer exit!");
}
}
/**
* Manager
*/
public class Manager implements Runnable{
private String name;
private HouseQueue creations;
private HouseQueue finished;
private List<Class<? extends Contractor>> plan;
private House project;
private ContractorPool humanResource;
public Manager(String n,HouseQueue c,HouseQueue f,List<Class<? extends Contractor>> p,ContractorPool h){
name=n;
creations=c;
finished=f;
plan=p;
humanResource=h;
}
public String toString(){return "Manager "+name;}
public House getProject(){return project;}
public synchronized void bindingProject(House house){project=house;}
public synchronized void finishProject() throws InterruptedException{
finished.put(project); //never block
project=null;
}
public synchronized void vacation() throws InterruptedException{
TimeUnit.MILLISECONDS.sleep(10);
}
public void run(){ //thread
try{
while(!Thread.interrupted()){
House house=creations.take(); //block
System.out.println(this+" take the project of "+house);
bindingProject(house);
for(Class<? extends Contractor> task:plan){
humanResource.hire(task,this);
}
finishProject();
vacation();
}
}catch(InterruptedException ie){
System.out.println("Manager interrupted!");
}
System.out.println("Manager exit!");
}
}
/**
* Contractor
*/
public abstract class Contractor implements Runnable{
private String name;
private Manager manager;
private boolean ready=false;
protected ContractorPool pool;
public Contractor(String n, ContractorPool p){name=n;pool=p;}
public synchronized void bindingManager(Manager m){
pool.remove(this);
manager=m;
}
public synchronized void decouplingManager(){
pool.add(this);
manager=null;
}
public synchronized Manager getManager(){return manager;}
public synchronized String getName(){return name;}
public boolean isReady(){return ready;}
public synchronized void ready() throws InterruptedException{
decouplingManager();
ready=true;
synchronized(pool){
notifyAll();
}
System.out.println(this+" is ready!");
wait();
}
public abstract void work() throws InterruptedException;
public void run(){
try{
while(!Thread.interrupted()){
ready(); //block
work();
}
}catch(InterruptedException ie){
System.out.println(this+" Interrupted!");
}
System.out.println(this+" exit!");
}
}
public class BaseContractor extends Contractor{
public BaseContractor(String n,ContractorPool p){super(n,p);}
public String toString(){return "Base contractor "+getName();}
public synchronized void work() throws InterruptedException{
Manager manager=getManager();
manager.getProject().addBase();
System.out.println(manager.getProject()+" Base well constructed by "+this);
notifyAll();
}
}
public class StructureContractor extends Contractor{
public StructureContractor(String n,ContractorPool p){super(n,p);}
public String toString(){return "Structure contractor "+getName();}
public synchronized void work() throws InterruptedException{
Manager manager=getManager();
manager.getProject().addStructure();
System.out.println(manager.getProject()+" Structure well constructed by "+this);
notifyAll();
}
}
public class PipelineContractor extends Contractor{
public PipelineContractor(String n,ContractorPool p){super(n,p);}
public String toString(){return "Pipeline contractor "+getName();}
public synchronized void work() throws InterruptedException{
Manager manager=getManager();
manager.getProject().addPipeline();
System.out.println(manager.getProject()+" Pipeline well constructed by "+this);
notifyAll();
}
}
public class WallContractor extends Contractor{
public WallContractor(String n,ContractorPool p){super(n,p);}
public String toString(){return "Wall contractor "+getName();}
public synchronized void work() throws InterruptedException{
Manager manager=getManager();
manager.getProject().addWall();
System.out.println(manager.getProject()+" Wall well constructed by "+this);
notifyAll();
}
}
public class DecorationContractor extends Contractor{
public DecorationContractor(String n,ContractorPool p){super(n,p);}
public String toString(){return "Decoration contractor "+getName();}
public synchronized void work() throws InterruptedException{
Manager manager=getManager();
manager.getProject().addDecoration();
System.out.println(manager.getProject()+" Decoration well constructed by "+this);
notifyAll(); //tell manager this work is done
}
}
public class ContractorPool{
private List<Contractor> pool=new ArrayList<Contractor>();
public List<Contractor> getContractors(){return pool;}
public synchronized void add(Contractor c){pool.add(c);notifyAll();}
public synchronized void remove(Contractor c){pool.remove(c);}
public void hire(Class<? extends Contractor> task, Manager manager) throws InterruptedException{
for(Contractor contractor:pool){
if(contractor.getClass().equals(task)){
synchronized(contractor){
contractor.bindingManager(manager);
System.out.println(manager+" hire "+contractor);
contractor.notifyAll(); //call contractor to work
contractor.wait(); //block, wait contractor to finish the work
System.out.println("Bye bye "+contractor);
return;
}
}
wait();
hire(task,manager);
}
}
}
/**
* Acceptance
*/
public class Acceptance implements Runnable{
private String name;
private HouseQueue finished;
public Acceptance(String n,HouseQueue h){name=n;finished=h;}
public String toString(){return "Acceptance "+name;}
public String getName(){return name;}
public void run(){
try{
while(!Thread.interrupted()){
House house=finished.take(); //block
System.out.println(house+" finished!");
}
}catch(InterruptedException ie){
System.out.println(this+" Interrupted!");
}
System.out.println(this+" exit!");
}
}
public String getName(int length){
Random rand=new Random();
StringBuilder str=new StringBuilder();
for(int i=0;i<length;i++){
str.append((char)(((int)'a')+rand.nextInt(26)));
}
return str.toString();
}
public static void main(String[] args){
System.out.println("Press Entry to exit!");
Exercise38 ex=new Exercise38();
ExecutorService exec=Executors.newCachedThreadPool();
HouseQueue orders=ex.new HouseQueue();
HouseQueue creations=ex.new HouseQueue();
HouseQueue finished=ex.new HouseQueue();
List<Class<? extends Contractor>> plan=new ArrayList<Class<? extends Contractor>>();
plan.add(BaseContractor.class);
plan.add(StructureContractor.class);
plan.add(PipelineContractor.class);
plan.add(WallContractor.class);
plan.add(DecorationContractor.class);
ContractorPool humanResource=ex.new ContractorPool();
exec.execute(ex.new BaseContractor(ex.getName(1),humanResource));
exec.execute(ex.new StructureContractor(ex.getName(1),humanResource));
exec.execute(ex.new PipelineContractor(ex.getName(1),humanResource));
exec.execute(ex.new WallContractor(ex.getName(1),humanResource));
exec.execute(ex.new DecorationContractor(ex.getName(1),humanResource));
exec.execute(ex.new Market(orders)); //market thread
for(int i=0;i<5;i++) { //designer thread
Designer d=ex.new Designer(ex.getName(7),orders,creations);
exec.execute(d);
}
for(int i=0;i<3;i++){ //manager thread
Manager manager=ex.new Manager(ex.getName(5),creations,finished,plan,humanResource);
exec.execute(manager);
}
exec.execute(ex.new Acceptance(ex.getName(5),finished));
try{
System.in.read();
}catch(IOException ioe){
System.out.println("Test interrupted!");
}
System.out.println("Test exit!");
exec.shutdownNow();
}
}
首先Lock和Atomic肯定是不一样的。Atomic的效果其实非常有限,书上的FastSimulation.java的互斥临界区非常小,只能保证在获取了oldValue后,然后计算出newValue之前,如果其他线程改变了oldValue的值,Atomic才会发现他的原子性遭到了破坏。但这样并不能保证整个操作是线程安全的。因为newValue是previous和next和current三者的平均值,这里如果是next或者previous中途被修改,Atomic是无法察觉的。但实际上几个Evoler的操作已经被互相影响了。所以compareAndSet()仅仅确保oldValue没有被篡改并不是线程安全的。
但ReentrantLock完全不同,被lock()锁上的代码都能保证不可能同时有两个线程操作这段代码,所以是线程安全的。previous, oldValue, next都不会被中途篡改。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.atomic.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.*;
import java.io.*;
public class Exercise39{
private static final int N_ELEMENTS=100000;
private static final int N_GENES=30;
private static final int N_EVOLVERS=50;
private static final int[][] GRID=new int[N_ELEMENTS][N_GENES];
private static Random rand=new Random();
public static class Evoler implements Runnable{
public void run(){
while(!Thread.interrupted()){
int element=rand.nextInt(N_ELEMENTS);
for(int i=0;i<N_GENES;i++){
int previous=i-1;
if(previous<0){previous=N_GENES-1;}
int next=i+1;
if(next==N_GENES){next=0;}
Lock lock=new ReentrantLock();
lock.lock();
try{
int oldValue=GRID[element][i];
int newValue=(oldValue+GRID[element][previous]+GRID[element][next])/3;
GRID[element][i]=newValue;
System.out.println(oldValue+" replaced by "+newValue);
}finally{
lock.unlock();
}
}
}
}
}
public static void main(String[] args){
System.out.println("Press Entry to Exit!");
ExecutorService exec=Executors.newCachedThreadPool();
for(int i=0;i<N_ELEMENTS;i++){
for(int j=0;j<N_GENES;j++){
GRID[i][j]=rand.nextInt(1000);
}
}
for(int i=0;i<N_EVOLVERS;i++){
exec.execute(new Evoler());
}
try{
System.in.read();
}catch(IOException ie){
System.out.println("Test interrupted!");
}
exec.shutdownNow();
System.out.println("Test exit.");
}
}
这题,虽然能正常运行,但遗留了一个小问题:运行结果getReadLockCount()显示一直为0。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.locks.*;
import java.util.concurrent.*;
import java.util.*;
import java.io.*;
public class Exercise40<K,V>{
//ReadWriteMap
public class ReadWriteMap<K,V>{
private Map<K,V> lockedMap=new HashMap<K,V>();
private ReentrantReadWriteLock lock=new ReentrantReadWriteLock(true);
private Random rand=new Random();
public ReadWriteMap(Collection<K> c1, Collection<V> c2){
if(c1.size()!=c2.size()){System.out.println("Check your collections!");return;}
Iterator<K> iteKey=c1.iterator();
Iterator<V> iteValue=c2.iterator();
for(int i=0;i<c1.size();i++){
lockedMap.put(iteKey.next(),iteValue.next());
}
}
public String toString(){
StringBuilder sb=new StringBuilder();
for(Map.Entry<K,V> entry:lockedMap.entrySet()){
sb.append("["+entry.getKey()+","+entry.getValue()+"] ");
}
return sb.toString();
}
public V read(K key){
Lock readLock=lock.readLock();
readLock.lock();
try{
return lockedMap.get(key);
}finally{
readLock.unlock();
}
}
public void write(K key, V value){
Lock writeLock=lock.writeLock();
writeLock.lock();
try{
lockedMap.put(key,value);
}finally{
writeLock.unlock();
}
}
public Set<Map.Entry<K,V>> entrySet(){return lockedMap.entrySet();}
public ReentrantReadWriteLock getLock(){return lock;}
}
//Reader
public class Reader implements Runnable{
public void run(){
try{
for(Map.Entry<Integer,String> entry:map.entrySet()){
String value=map.read(entry.getKey());
System.out.println(value+" is readed! >>>"+map.getLock().getReadLockCount()+" threads is on Read Lock!");
TimeUnit.MILLISECONDS.sleep(0);
}
}catch(InterruptedException ie){
System.out.println("Reader interrupted!");
}
System.out.println("Reader exit!");
}
}
//Writer
public class Writer implements Runnable{
public void run(){
try{
for(Map.Entry<Integer,String> entry:map.entrySet()){
char rc=(char)(((int)'a')+rand.nextInt(26));
String str=Character.toString(rc);
map.write(entry.getKey(),str);
System.out.println(entry.getValue()+" is changed to "+str+" >>>"+map.getLock().getWriteHoldCount()+" threads is on Write Lock!");
TimeUnit.MILLISECONDS.sleep(0);
}
}catch(InterruptedException ie){
System.out.println("Writer interrupted!");
}
System.out.println("Writer exit!");
}
}
//fields
private ExecutorService exec=Executors.newCachedThreadPool();
private Random rand=new Random();
private ReadWriteMap<Integer,String> map;
private final int SIZE;
private final int readTime;
private final int writeTime;
private Exercise40(int size, int rt, int wt){
SIZE=size;
List<Integer> keys=new ArrayList<Integer>();
List<String> values=new ArrayList<String>();
for(int i=0;i<SIZE;i++){
keys.add(rand.nextInt(1000));
char rc=(char)(((int)'a')+rand.nextInt(26));
values.add(Character.toString(rc));
}
map=new ReadWriteMap<Integer,String>(keys,values);
readTime=rt;
writeTime=wt;
for(int i=0;i<readTime;i++){
exec.execute(new Reader());
}
for(int i=0;i<writeTime;i++){
exec.execute(new Writer());
}
try{
TimeUnit.MILLISECONDS.sleep(5);
}catch(InterruptedException ie){
System.out.println("Test interrupted!");
}
exec.shutdownNow();
System.out.println("Test exit!");
}
public static void main(String[] args){
Exercise40 test=new Exercise40(50,20,10);
}
}
两个存放Future结果的列表作为成员字段存放着,showResults()方法负责从结果列表中获取结果。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Exercise41{
private ExecutorService exec = Executors.newSingleThreadExecutor();
private List<Future<String>> intResults=new CopyOnWriteArrayList<Future<String>>();
private List<Future<String>> floatResults=new CopyOnWriteArrayList<Future<String>>();
private Random rand=new Random();
//calculateInt
public void calculateInt(final int a, final int b){
intResults.add(exec.submit(new Callable<String>(){
public String call(){
sleep(100);
return " "+a+" + "+b+" = "+(a+b);
}
}));
}
//calculateFloat
public void calculateFloat(final float a, final float b){
floatResults.add(exec.submit(new Callable<String>(){
public String call(){
sleep(100);
return " "+a+" + "+b+" = "+(a+b);
}
}));
}
//tools
public void sleep(int time){
try{
TimeUnit.MILLISECONDS.sleep(time);
}catch(InterruptedException ie){
System.out.println("Calculator is interrupted!");
}
}
public void shutdown(){exec.shutdownNow();}
//message handler
public void showResults(){
long endAt=System.currentTimeMillis()+5000; //5 secs
while(true){
for(Future<String> r:intResults){
if(r.isDone()){
try{
System.out.println(r.get());
}catch(Exception e){
throw new RuntimeException();
}
intResults.remove(r);
}
}
for(Future<String> r:floatResults){
if(r.isDone()){
try{
System.out.println(r.get());
}catch(Exception e){
throw new RuntimeException();
}
floatResults.remove(r);
}
}
if(System.currentTimeMillis()>=endAt){
break;
}
sleep(100);
}
}
public static void main(String[] args){
Exercise41 test=new Exercise41();
for(int i=0;i<5;i++){
test.calculateInt(test.rand.nextInt(100),test.rand.nextInt(100));
test.calculateFloat((test.rand.nextFloat()*test.rand.nextInt(100)),(test.rand.nextFloat()*test.rand.nextInt(100)));
}
test.showResults();
test.shutdown();
}
}
这里每个机器人ActiveCarRobot就是一个活动对象,每个机器人负责一辆车,它有自己的单线程Executor,有自己的任务队列和结果队列,交替执行10次上蜡和抛光之后,显示结果。
package com.ciaoshen.thinkinjava.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Exercise42{
private static int carCount=0;
private static int robotCount=0;
private static List<ActiveCarRobot> robots=new ArrayList<ActiveCarRobot>();
public class Car{
private final int id=++carCount;
private boolean waxOn=false;
public void waxOn(){
if(waxOn){System.out.println("Error, the wax already on!");return;}
waxOn=true;
}
public void waxOff(){
if(!waxOn){System.out.println("Error, should waxOn before waxOff!");return;}
waxOn=false;
}
public String toString(){return "Car#"+id;}
}
public class ActiveCarRobot implements Runnable{
private final int id=++robotCount;
private final ExecutorService exec=Executors.newSingleThreadExecutor();
private List<Future<String>> results=new CopyOnWriteArrayList<Future<String>>();
private Car car;
public ActiveCarRobot(Car c){car=c;robots.add(this);}
public String toString(){return "Robot#"+id;}
public void run(){
for(int i=0;i<10;i++){
results.add(waxOn());
sleep(10);
results.add(waxOff());
}
showResults();
shutdown();
}
public Future<String> waxOn(){
return exec.submit(new Callable<String>(){
public String call(){
sleep(10);
car.waxOn();
return " "+car+" wax on by "+ActiveCarRobot.this;
}
});
}
public Future<String> waxOff(){
return exec.submit(new Callable<String>(){
public String call(){
sleep(10);
car.waxOff();
return " "+car+" wax off by "+ActiveCarRobot.this;
}
});
}
public void sleep(int time){
try{
TimeUnit.MILLISECONDS.sleep(time);
}catch(InterruptedException ie){
System.out.println(this+" interrupted!");
}
}
public void shutdown(){exec.shutdownNow();}
public void showResults(){
long endAt=System.currentTimeMillis()+5000;
while(true){
for(Future<String> f:results){
if(f.isDone()){
try{
System.out.println(f.get());
}catch(Exception e){
System.out.println("Error when reading the results!");
}
}
results.remove(f);
}
if(System.currentTimeMillis()>=endAt){break;}
}
}
}
public static void main(String[] args){
Exercise42 test=new Exercise42();
ExecutorService exec=Executors.newCachedThreadPool();
for(int i=0;i<10;i++){
exec.execute(test.new ActiveCarRobot(test.new Car()));
}
try{
TimeUnit.SECONDS.sleep(5);
}catch(InterruptedException ie){
System.out.println("Test interrupted!");
}
exec.shutdownNow();
}
}